http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml new file mode 100644 index 0000000..d852fe1 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -0,0 +1,56 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-controller-service</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.11</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java new file mode 100644 index 0000000..e5af8ea --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.druid; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.codehaus.jackson.map.ObjectMapper; + +import com.metamx.common.Granularity; +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.ClusteredBeamTuning; +import com.metamx.tranquility.druid.DruidBeamConfig; +import com.metamx.tranquility.druid.DruidBeams; +import com.metamx.tranquility.druid.DruidDimensions; +import com.metamx.tranquility.druid.DruidEnvironment; +import com.metamx.tranquility.druid.DruidLocation; +import com.metamx.tranquility.druid.DruidRollup; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.joda.time.DateTime; +import org.joda.time.Period; + + +@Tags({"Druid", "Timeseries", "OLAP", "ingest"}) +@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task using Tranquility API. " + + "If aggregation and roll-up of data is required, an Aggregator JSON descriptor needs to be provided." + + "Details on how describe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html") +public class DruidTranquilityController extends AbstractControllerService implements DruidTranquilityService { + private final static String FIREHOSE_PATTERN = "druid:firehose:%s"; + + private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute"); + private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes"); + private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour"); + + private final static List<String> TIME_ORDINALS = Arrays.asList("SECOND", "MINUTE", "FIVE_MINUTE", "TEN_MINUTE", "FIFTEEN_MINUTE", "HOUR", "SIX_HOUR", "DAY", "WEEK", "MONTH", "YEAR"); + + private Tranquilizer tranquilizer = null; + private String transitUri = ""; + + public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder() + .name("druid-cs-data-source") + .displayName("Druid Data Source") + .description("A data source is the Druid equivalent of a database table.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("druid-cs-zk-connect-string") + .displayName("Zookeeper Connection String") + .description("A comma-separated list of host:port pairs, each corresponding to a ZooKeeper server. Ex: localhost:2181") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_BASE_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-base-sleep") + .displayName("Zookeeper Retry Base Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time (in milliseconds) to wait at first before retrying.") + .required(true) + .defaultValue("1000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_MAX_RETRIES = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-max-retries") + .displayName("Zookeeper Retry Max Retries") + .description("When a connection to Zookeeper needs to be retried, this property specifies how many times to attempt reconnection.") + .required(true) + .defaultValue("20") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor ZOOKEEPER_RETRY_SLEEP_TIME = new PropertyDescriptor.Builder() + .name("druid-cs-zk-retry-sleep") + .displayName("Zookeeper Retry Sleep Time") + .description("When a connection to Zookeeper needs to be retried, this property specifies the amount of time to sleep (in milliseconds) between retries.") + .required(true) + .defaultValue("30000") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-index-service-path") + .displayName("Index Service Path") + .description("Druid Index Service path as defined via the Druid Overlord druid.service property.") + .required(true) + .defaultValue("druid/overlord") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-discovery-path") + .displayName("Discovery Path") + .description("Druid Discovery Path as configured in Druid Common druid.discovery.curator.path property") + .required(true) + .defaultValue("/druid/discovery") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor CLUSTER_PARTITIONS = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-partitions") + .displayName("Cluster Partitions") + .description("The number of partitions in the Druid cluster.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLUSTER_REPLICATION = new PropertyDescriptor.Builder() + .name("druid-cs-cluster-replication") + .displayName("Cluster Replication") + .description("The replication factor for the Druid cluster.") + .required(true) + .defaultValue("1") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder() + .name("druid-cs-timestamp-field") + .displayName("Timestamp field") + .description("The name of the field that will be used as the timestamp. Should be in ISO8601 format.") + .required(true) + .defaultValue("timestamp") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor AGGREGATOR_JSON = new PropertyDescriptor.Builder() + .name("druid-cs-aggregators-descriptor") + .displayName("Aggregator JSON") + .description("Tranquility-compliant JSON string that defines what aggregators to apply on ingest." + + "Example: " + + "[" + + "{" + + "\t\"type\" : \"count\"," + + "\t\"name\" : \"count\"," + + "}," + + "{" + + "\t\"name\" : \"value_sum\"," + + "\t\"type\" : \"doubleSum\"," + + "\t\"fieldName\" : \"value\"" + + "}," + + "{" + + "\t\"fieldName\" : \"value\"," + + "\t\"name\" : \"value_min\"," + + "\t\"type\" : \"doubleMin\"" + + "}," + + "{" + + "\t\"type\" : \"doubleMax\"," + + "\t\"name\" : \"value_max\"," + + "\t\"fieldName\" : \"value\"" + + "}" + + "]") + .required(true) + .addValidator((subject, value, context) -> { // Non-empty and valid JSON validator + if (value == null || value.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " cannot be empty").build(); + } + try { + DruidTranquilityController.parseJsonString(value); + return new ValidationResult.Builder().subject(subject).input(value).valid(true).build(); + } catch (IllegalArgumentException iae) { + return new ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject + " is not valid Aggregator JSON").build(); + } + }) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DIMENSIONS_LIST = new PropertyDescriptor.Builder() + .name("druid-cs-dimensions-list") + .displayName("Dimension Fields") + .description("A comma separated list of field names that will be stored as dimensions on ingest.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor SEGMENT_GRANULARITY = new PropertyDescriptor.Builder() + .name("druid-cs-segment-granularity") + .displayName("Segment Granularity") + .description("Time unit by which to group and aggregate/rollup events. The value must be at least as large as the value of Query Granularity.") + .required(true) + .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR") + .defaultValue("TEN_MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY_GRANULARITY = new PropertyDescriptor.Builder() + .name("druid-cs-query-granularity") + .displayName("Query Granularity") + .description("Time unit by which to group and aggregate/rollup events. The value must be less than or equal to the value of Segment Granularity.") + .required(true) + .allowableValues("NONE", "SECOND", "MINUTE", "FIFTEEN_MINUTE", "THIRTY_MINUTE", "HOUR", "DAY", "MONTH", "YEAR") + .defaultValue("MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX_RETRY_PERIOD = new PropertyDescriptor.Builder() + .name("druid-cs-index-retry-period") + .displayName("Index Retry Period") + .description("Grace period to allow late arriving events for real time ingest.") + .required(true) + .allowableValues(PT1M, PT10M, PT60M) + .defaultValue(PT10M.getValue()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WINDOW_PERIOD = new PropertyDescriptor.Builder() + .name("druid-cs-window-period") + .displayName("Late Event Grace Period") + .description("Grace period to allow late arriving events for real time ingest.") + .required(true) + .allowableValues(PT1M, PT10M, PT60M) + .defaultValue(PT10M.getValue()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("druid-cs-batch-size") + .displayName("Batch Size") + .description("Maximum number of messages to send at once.") + .required(true) + .defaultValue("2000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MAX_PENDING_BATCHES = new PropertyDescriptor.Builder() + .name("druid-cs-max-pending-batches") + .displayName("Max Pending Batches") + .description("Maximum number of batches that may be in flight before service blocks and waits for one to finish.") + .required(true) + .defaultValue("5") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor LINGER_MILLIS = new PropertyDescriptor.Builder() + .name("druid-cs-linger-millis") + .displayName("Linger (milliseconds)") + .description("Wait this long for batches to collect more messages (up to Batch Size) before sending them. " + + "Set to zero to disable waiting. " + + "Set to -1 to always wait for complete batches before sending. ") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + private static final List<PropertyDescriptor> properties; + + private volatile CuratorFramework curator; + private volatile int zkBaseSleepMillis; + private volatile int zkMaxRetries; + private volatile int zkSleepMillis; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(DATASOURCE); + props.add(ZOOKEEPER_CONNECTION_STRING); + props.add(ZOOKEEPER_RETRY_BASE_SLEEP_TIME); + props.add(ZOOKEEPER_RETRY_MAX_RETRIES); + props.add(ZOOKEEPER_RETRY_SLEEP_TIME); + props.add(DRUID_INDEX_SERVICE_PATH); + props.add(DRUID_DISCOVERY_PATH); + props.add(CLUSTER_PARTITIONS); + props.add(CLUSTER_REPLICATION); + props.add(DIMENSIONS_LIST); + props.add(AGGREGATOR_JSON); + props.add(SEGMENT_GRANULARITY); + props.add(QUERY_GRANULARITY); + props.add(WINDOW_PERIOD); + props.add(TIMESTAMP_FIELD); + props.add(MAX_BATCH_SIZE); + props.add(MAX_PENDING_BATCHES); + props.add(LINGER_MILLIS); + + properties = Collections.unmodifiableList(props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + final String segmentGranularity = validationContext.getProperty(SEGMENT_GRANULARITY).getValue(); + final String queryGranularity = validationContext.getProperty(QUERY_GRANULARITY).getValue(); + + // Verify that segment granularity is as least as large as query granularity + if (TIME_ORDINALS.indexOf(segmentGranularity) < TIME_ORDINALS.indexOf(queryGranularity)) { + results.add(new ValidationResult.Builder().valid(false).explanation( + "Segment Granularity must be at least as large as Query Granularity").build()); + } + + return results; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) { + ComponentLog log = getLogger(); + log.info("Starting Druid Tranquility Controller Service..."); + + final String dataSource = context.getProperty(DATASOURCE).evaluateAttributeExpressions().getValue(); + final String zkConnectString = context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue(); + zkBaseSleepMillis = context.getProperty(ZOOKEEPER_RETRY_BASE_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + zkMaxRetries = context.getProperty(ZOOKEEPER_RETRY_BASE_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + zkSleepMillis = context.getProperty(ZOOKEEPER_RETRY_SLEEP_TIME).evaluateAttributeExpressions().asInteger(); + final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).evaluateAttributeExpressions().getValue(); + final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).evaluateAttributeExpressions().getValue(); + final int clusterPartitions = context.getProperty(CLUSTER_PARTITIONS).evaluateAttributeExpressions().asInteger(); + final int clusterReplication = context.getProperty(CLUSTER_REPLICATION).evaluateAttributeExpressions().asInteger(); + final String timestampField = context.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions().getValue(); + final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue(); + final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue(); + final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue(); + final String indexRetryPeriod = context.getProperty(INDEX_RETRY_PERIOD).getValue(); + final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).evaluateAttributeExpressions().getValue(); + final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue(); + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int maxPendingBatches = context.getProperty(MAX_PENDING_BATCHES).evaluateAttributeExpressions().asInteger(); + final int lingerMillis = context.getProperty(LINGER_MILLIS).evaluateAttributeExpressions().asInteger(); + + transitUri = String.format(FIREHOSE_PATTERN, dataSource) + ";indexServicePath=" + indexService; + + final List<String> dimensions = getDimensions(dimensionsStringList); + final List<AggregatorFactory> aggregator = getAggregatorList(aggregatorJSON); + + final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() { + private static final long serialVersionUID = 1L; + + @Override + public DateTime timestamp(Map<String, Object> theMap) { + return new DateTime(theMap.get(timestampField)); + } + }; + + Iterator<AggregatorFactory> aggIterator = aggregator.iterator(); + AggregatorFactory currFactory; + log.debug("Number of Aggregations Defined: {}", new Object[]{aggregator.size()}); + while (aggIterator.hasNext()) { + currFactory = aggIterator.next(); + log.debug("Verifying Aggregator Definition\n\tAggregator Name: {}\n\tAggregator Type: {}\n\tAggregator Req Fields: {}", + new Object[]{currFactory.getName(), currFactory.getTypeName(), currFactory.requiredFields()}); + } + // Tranquility uses ZooKeeper (through Curator) for coordination. + curator = getCurator(zkConnectString); + curator.start(); + + // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, + // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. + final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null); + + final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication, + segmentGranularity, queryGranularity, windowPeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec); + + tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam); + + tranquilizer.start(); + } + + Tranquilizer<Map<String, Object>> buildTranquilizer(int maxBatchSize, int maxPendingBatches, int lingerMillis, Beam<Map<String, Object>> beam) { + return Tranquilizer.builder() + .maxBatchSize(maxBatchSize) + .maxPendingBatches(maxPendingBatches) + .lingerMillis(lingerMillis) + .blockOnFull(true) + .build(beam); + } + + Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, + String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions, + List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) { + return DruidBeams.builder(timestamper) + .curator(curator) + .discoveryPath(discoveryPath) + .location(DruidLocation.create(DruidEnvironment.create(indexService, FIREHOSE_PATTERN), dataSource)) + .timestampSpec(timestampSpec) + .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity))) + .tuning( + ClusteredBeamTuning + .builder() + .segmentGranularity(getGranularity(segmentGranularity)) + .windowPeriod(new Period(windowPeriod)) + .partitions(clusterPartitions) + .replicants(clusterReplication) + .build() + ) + .druidBeamConfig( + DruidBeamConfig + .builder() + .indexRetryPeriod(new Period(indexRetryPeriod)) + .build()) + .buildBeam(); + } + + @OnDisabled + public void onDisabled() { + if (tranquilizer != null) { + tranquilizer.flush(); + tranquilizer.stop(); + tranquilizer = null; + } + + if (curator != null) { + curator.close(); + curator = null; + } + } + + public Tranquilizer getTranquilizer() { + return tranquilizer; + } + + CuratorFramework getCurator(String zkConnectString) { + return CuratorFrameworkFactory + .builder() + .connectString(zkConnectString) + .retryPolicy(new ExponentialBackoffRetry(zkBaseSleepMillis, zkMaxRetries, zkSleepMillis)) + .build(); + } + + @Override + public String getTransitUri() { + return transitUri; + } + + @SuppressWarnings("unchecked") + private static List<Map<String, String>> parseJsonString(String aggregatorJson) { + if (aggregatorJson == null) { + return Collections.EMPTY_LIST; + } + ObjectMapper mapper = new ObjectMapper(); + final List<Map<String, String>> aggSpecList; + try { + aggSpecList = mapper.readValue(aggregatorJson, List.class); + return aggSpecList; + } catch (IOException e) { + throw new IllegalArgumentException("Exception while parsing the aggregrator JSON"); + } + } + + private List<String> getDimensions(String dimensionStringList) { + List<String> dimensionList = new ArrayList<>(); + if (dimensionStringList != null) { + Arrays.stream(dimensionStringList.split(",")) + .filter(StringUtils::isNotBlank) + .map(String::trim) + .forEach(dimensionList::add); + } + return dimensionList; + } + + private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) { + List<AggregatorFactory> aggregatorList = new LinkedList<>(); + List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON); + for (Map<String, String> aggregator : aggregatorInfo) { + + if (aggregator.get("type").equalsIgnoreCase("count")) { + aggregatorList.add(getCountAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { + aggregatorList.add(getDoubleSumAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { + aggregatorList.add(getDoubleMaxAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { + aggregatorList.add(getDoubleMinAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longsum")) { + aggregatorList.add(getLongSumAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longmax")) { + aggregatorList.add(getLongMaxAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longmin")) { + aggregatorList.add(getLongMinAggregator(aggregator)); + } + } + + return aggregatorList; + } + + private AggregatorFactory getLongMinAggregator(Map<String, String> map) { + return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongMaxAggregator(Map<String, String> map) { + return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongSumAggregator(Map<String, String> map) { + return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) { + return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) { + return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) { + return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getCountAggregator(Map<String, String> map) { + return new CountAggregatorFactory(map.get("name")); + } + + private Granularity getGranularity(String granularityString) { + Granularity granularity = Granularity.HOUR; + + switch (granularityString) { + case "SECOND": + granularity = Granularity.SECOND; + break; + case "MINUTE": + granularity = Granularity.MINUTE; + break; + case "FIVE_MINUTE": + granularity = Granularity.FIVE_MINUTE; + break; + case "TEN_MINUTE": + granularity = Granularity.TEN_MINUTE; + break; + case "FIFTEEN_MINUTE": + granularity = Granularity.FIFTEEN_MINUTE; + break; + case "HOUR": + granularity = Granularity.HOUR; + break; + case "SIX_HOUR": + granularity = Granularity.SIX_HOUR; + break; + case "DAY": + granularity = Granularity.DAY; + break; + case "WEEK": + granularity = Granularity.WEEK; + break; + case "MONTH": + granularity = Granularity.MONTH; + break; + case "YEAR": + granularity = Granularity.YEAR; + break; + default: + break; + } + return granularity; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..201af17 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.controller.druid.DruidTranquilityController \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java new file mode 100644 index 0000000..56c2616 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/druid/DruidTranquilityControllerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.druid; + + +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import org.apache.curator.framework.CuratorFramework; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class DruidTranquilityControllerTest { + + private TestRunner runner; + private MockDruidTranquilityController service; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new MockDruidTranquilityController(); + runner.addControllerService("Client Service", service); + } + + @Test + public void testValid() { + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.DATASOURCE, "test"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]"); + runner.assertNotValid(service); + runner.setProperty(service, DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2"); + runner.assertValid(service); + } + + public static class MockDruidTranquilityController extends DruidTranquilityController { + + Tranquilizer t = mock(Tranquilizer.class); + CuratorFramework c = mock(CuratorFramework.class); + + @Override + public Tranquilizer getTranquilizer() { + return t; + } + + @Override + CuratorFramework getCurator(String zkConnectString) { + return c; + } + } + + public static class TestControllerServiceProcessor extends AbstractProcessor { + + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Client Service") + .description("DruidTranquilityService") + .identifiesControllerService(DruidTranquilityService.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(); + propertyDescriptors.add(CLIENT_SERVICE); + return propertyDescriptors; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml new file mode 100644 index 0000000..6a384c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml @@ -0,0 +1,44 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-processors</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..f3c8ece --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,209 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..b8c958c --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-druid-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml new file mode 100644 index 0000000..f596c6f --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -0,0 +1,74 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-processors</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.11</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java new file mode 100644 index 0000000..19e47a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.druid; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import scala.runtime.BoxedUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"druid", "timeseries", "olap", "ingest", "put", "record"}) +@CapabilityDescription("Sends records to Druid for Indexing. Leverages Druid Tranquility Controller service.") +@WritesAttribute(attribute = "record.count", description = "The number of messages that were sent to Druid for this FlowFile. FlowFiles on the success relationship will have a value " + + "of this attribute that indicates the number of records successfully processed by Druid, and the FlowFile content will be only the successful records. This behavior applies " + + "to the failure and dropped relationships as well.") +public class PutDruidRecord extends AbstractSessionFactoryProcessor { + + static final String RECORD_COUNT = "record.count"; + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("putdruid-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("putdruid-record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data to outgoing relationships.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder() + .name("putdruid-tranquility-service") + .displayName("Tranquility Service") + .description("Tranquility Service to use for sending events to Druid.") + .required(true) + .identifiesControllerService(DruidTranquilityService.class) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship when they are successfully processed by Druid") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship when they cannot be parsed or otherwise processed by Druid") + .build(); + + static final Relationship REL_DROPPED = new Relationship.Builder() + .name("dropped") + .description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...") + .build(); + + + public void init(final ProcessorInitializationContext context) { + List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(RECORD_WRITER_FACTORY); + properties.add(DRUID_TRANQUILITY_SERVICE); + this.properties = Collections.unmodifiableList(properties); + + Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_DROPPED); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + /** + * Parses the record(s), converts each to a Map, and sends via Tranquility to the Druid Indexing Service + * + * @param context The process context + * @param session The process session + */ + @SuppressWarnings("unchecked") + private void processFlowFile(ProcessContext context, final ProcessSession session) { + final ComponentLog log = getLogger(); + + // Get handle on Druid Tranquility session + DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE) + .asControllerService(DruidTranquilityService.class); + Tranquilizer<Map<String, Object>> tranquilizer = tranquilityController.getTranquilizer(); + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + // Create the outgoing flow files and output streams + FlowFile droppedFlowFile = session.create(flowFile); + final AtomicInteger droppedFlowFileCount = new AtomicInteger(0); + FlowFile failedFlowFile = session.create(flowFile); + final AtomicInteger failedFlowFileCount = new AtomicInteger(0); + FlowFile successfulFlowFile = session.create(flowFile); + final AtomicInteger successfulFlowFileCount = new AtomicInteger(0); + + final AtomicInteger recordWriteErrors = new AtomicInteger(0); + + int recordCount = 0; + final OutputStream droppedOutputStream = session.write(droppedFlowFile); + final RecordSetWriter droppedRecordWriter; + final OutputStream failedOutputStream = session.write(failedFlowFile); + final RecordSetWriter failedRecordWriter; + final OutputStream successfulOutputStream = session.write(successfulFlowFile); + final RecordSetWriter successfulRecordWriter; + try (final InputStream in = session.read(flowFile)) { + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + + final Map<String, String> attributes = flowFile.getAttributes(); + + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger()); + final RecordSchema outSchema = writerFactory.getSchema(attributes, reader.getSchema()); + droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream); + droppedRecordWriter.beginRecordSet(); + failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream); + failedRecordWriter.beginRecordSet(); + successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream); + successfulRecordWriter.beginRecordSet(); + + Record r; + while ((r = reader.nextRecord()) != null) { + final Record record = r; + recordCount++; + // Convert each Record to HashMap and send to Druid + Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(r, RecordFieldType.RECORD.getRecordDataType(r.getSchema())); + + log.debug("Tranquilizer Status: {}", new Object[]{tranquilizer.status().toString()}); + // Send data element to Druid asynchronously + Future<BoxedUnit> future = tranquilizer.send(contentMap); + log.debug("Sent Payload to Druid: {}", new Object[]{contentMap}); + + // Wait for Druid to call back with status + future.addEventListener(new FutureEventListener<Object>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof MessageDroppedException) { + // This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data) + log.debug("Record Dropped due to MessageDroppedException: {}, transferring record to dropped.", new Object[]{cause.getMessage()}, cause); + try { + synchronized (droppedRecordWriter) { + droppedRecordWriter.write(record); + droppedRecordWriter.flush(); + droppedFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to dropped, this may result in data loss.", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + + } else { + log.error("FlowFile Processing Failed due to: {}", new Object[]{cause.getMessage()}, cause); + try { + synchronized (failedRecordWriter) { + failedRecordWriter.write(record); + failedRecordWriter.flush(); + failedFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to failure, this may result in data loss.", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + } + } + + @Override + public void onSuccess(Object value) { + log.debug(" FlowFile Processing Success: {}", new Object[]{value.toString()}); + try { + synchronized (successfulRecordWriter) { + successfulRecordWriter.write(record); + successfulRecordWriter.flush(); + successfulFlowFileCount.incrementAndGet(); + } + } catch (final IOException ioe) { + log.error("Error transferring record to success, this may result in data loss. " + + "However the record was successfully processed by Druid", new Object[]{ioe.getMessage()}, ioe); + recordWriteErrors.incrementAndGet(); + } + } + }); + + } + + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + log.error("FlowFile Processing Failed due to: {}", new Object[]{e.getMessage()}, e); + // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles() + flowFile = session.putAttribute(flowFile, RECORD_COUNT, Integer.toString(recordCount)); + session.transfer(flowFile, REL_FAILURE); + try { + droppedOutputStream.close(); + session.remove(droppedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with dropped records.", ioe); + } + try { + failedOutputStream.close(); + session.remove(failedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with failed records.", ioe); + } + try { + successfulOutputStream.close(); + session.remove(successfulFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with successful records.", ioe); + } + session.commit(); + return; + } + + if (recordCount == 0) { + // Send original (empty) flow file to success, remove the rest + flowFile = session.putAttribute(flowFile, RECORD_COUNT, "0"); + session.transfer(flowFile, REL_SUCCESS); + try { + droppedOutputStream.close(); + session.remove(droppedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with dropped records.", ioe); + } + try { + failedOutputStream.close(); + session.remove(failedFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with failed records.", ioe); + } + try { + successfulOutputStream.close(); + session.remove(successfulFlowFile); + } catch (IOException ioe) { + log.error("Error closing output stream for FlowFile with successful records.", ioe); + } + } else { + + // Wait for all the records to finish processing + while (recordCount != (droppedFlowFileCount.get() + failedFlowFileCount.get() + successfulFlowFileCount.get() + recordWriteErrors.get())) { + Thread.yield(); + } + + // Send partitioned flow files out to their relationships (or remove them if empty) + + try { + droppedRecordWriter.finishRecordSet(); + droppedRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with dropped records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (droppedFlowFileCount.get() > 0) { + droppedFlowFile = session.putAttribute(droppedFlowFile, RECORD_COUNT, Integer.toString(droppedFlowFileCount.get())); + session.transfer(droppedFlowFile, REL_DROPPED); + } else { + session.remove(droppedFlowFile); + } + + try { + failedRecordWriter.finishRecordSet(); + failedRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with failed records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (failedFlowFileCount.get() > 0) { + failedFlowFile = session.putAttribute(failedFlowFile, RECORD_COUNT, Integer.toString(failedFlowFileCount.get())); + session.transfer(failedFlowFile, REL_FAILURE); + } else { + session.remove(failedFlowFile); + } + + try { + successfulRecordWriter.finishRecordSet(); + successfulRecordWriter.close(); + } catch (IOException ioe) { + log.error("Error closing FlowFile with successful records: {}", new Object[]{ioe.getMessage()}, ioe); + session.rollback(); + throw new ProcessException(ioe); + } + if (successfulFlowFileCount.get() > 0) { + successfulFlowFile = session.putAttribute(successfulFlowFile, RECORD_COUNT, Integer.toString(successfulFlowFileCount.get())); + session.transfer(successfulFlowFile, REL_SUCCESS); + session.getProvenanceReporter().send(successfulFlowFile, tranquilityController.getTransitUri()); + } else { + session.remove(successfulFlowFile); + } + + session.remove(flowFile); + } + + session.commit(); + } + + public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException { + final ProcessSession session = factory.createSession(); + processFlowFile(context, session); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..25ef747 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.druid.PutDruidRecord \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java new file mode 100644 index 0000000..695212c --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.druid; + +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; +import com.twitter.finagle.Status; +import com.twitter.util.Awaitable; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.TimeoutException; +import com.twitter.util.Try; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.query.aggregation.AggregatorFactory; +import org.apache.curator.framework.CuratorFramework; +import scala.Function1; +import scala.Option; +import scala.runtime.BoxedUnit; + +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MockDruidTranquilityController extends DruidTranquilityController { + + private final Tranquilizer t; + private final CuratorFramework cf; + private int numCalls = 0; + + public MockDruidTranquilityController() { + this(-1, -1); + } + + /** + * Creates a mock/stub Druid controller for testing. The failAfterN parameter must be higher than the dropAfterN parameter in order for messages to be dropped. + * + * @param dropAfterN The number of records after which to start calling the "dropped" callback, namely onFailure(MessageDroppedException) + * @param failAfterN The number of records after which to start calling the "failure" callback, namely onFailure(Exception) + */ + public MockDruidTranquilityController(final int dropAfterN, final int failAfterN) { + t = mock(Tranquilizer.class); + final Future<BoxedUnit> future = new Future<BoxedUnit>() { + + FutureEventListener<? super BoxedUnit> listener; + + @Override + public Future<BoxedUnit> addEventListener(FutureEventListener<? super BoxedUnit> listener) { + this.listener = listener; + numCalls++; + if (dropAfterN >= 0 && numCalls > failAfterN) { + listener.onFailure(new Exception()); + } else if (dropAfterN >= 0 && numCalls > dropAfterN) { + listener.onFailure(MessageDroppedException.Instance()); + } else { + listener.onSuccess(BoxedUnit.UNIT); + } + return this; + } + + @Override + public Awaitable<BoxedUnit> ready(Duration timeout, CanAwait permit) throws InterruptedException, TimeoutException { + return null; + } + + @Override + public BoxedUnit result(Duration timeout, CanAwait permit) throws Exception { + return null; + } + + @Override + public boolean isReady(CanAwait permit) { + return true; + } + + @Override + public Future<BoxedUnit> respond(Function1<Try<BoxedUnit>, BoxedUnit> k) { + return null; + } + + @Override + public Option<Try<BoxedUnit>> poll() { + return null; + } + + @Override + public void raise(Throwable interrupt) { + + } + + @Override + public <B> Future<B> transform(Function1<Try<BoxedUnit>, Future<B>> f) { + return null; + } + }; + when(t.send(anyObject())).thenReturn(future); + when(t.status()).thenReturn(new Status() { + }); + cf = mock(CuratorFramework.class); + } + + @Override + public Tranquilizer getTranquilizer() { + return t; + } + + @Override + CuratorFramework getCurator(String zkConnectString) { + return cf; + } + + @SuppressWarnings("unchecked") + @Override + Tranquilizer<Map<String, Object>> buildTranquilizer(int maxBatchSize, int maxPendingBatches, int lingerMillis, Beam<Map<String, Object>> beam) { + return t; + } + + @SuppressWarnings("unchecked") + @Override + Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication, + String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions, + List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) { + return mock(Beam.class); + } + + @Override + public String getTransitUri() { + return ""; + } + +}
