Repository: nifi Updated Branches: refs/heads/master d93d53817 -> 7e2910399
NIFI-4428: - Implement PutDruid Processor and Controller update added provenance report added parameters for batch control WIP Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fa0a34a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fa0a34a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fa0a34a Branch: refs/heads/master Commit: 7fa0a34aba23ea390720a8f41aefbe59f26b15c9 Parents: d93d538 Author: vvaks <[email protected]> Authored: Wed Sep 27 13:23:23 2017 -0400 Committer: joewitt <[email protected]> Committed: Tue Jan 16 13:07:56 2018 -0500 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 10 + .../pom.xml | 35 ++ .../nifi-druid-controller-service-api/pom.xml | 84 ++++ .../controller/api/DruidTranquilityService.java | 28 ++ .../nifi-druid-controller-service/pom.xml | 95 ++++ .../controller/DruidTranquilityController.java | 452 +++++++++++++++++++ ...org.apache.nifi.controller.ControllerService | 16 + .../DruidTranquilityControllerTest.java | 22 + .../nifi-druid-bundle/nifi-druid-nar/pom.xml | 39 ++ .../nifi-druid-processors/pom.xml | 95 ++++ .../org/apache/nifi/processors/PutDruid.java | 196 ++++++++ .../org.apache.nifi.processor.Processor | 16 + .../apache/nifi/processors/PutDruidTest.java | 22 + nifi-nar-bundles/nifi-druid-bundle/pom.xml | 41 ++ pom.xml | 12 + 15 files changed, 1163 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 49ef70a..d576261 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -561,6 +561,16 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-livy-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml new file mode 100644 index 0000000..6f4f606 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml @@ -0,0 +1,35 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-controller-service-api-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml new file mode 100644 index 0000000..97eac38 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml @@ -0,0 +1,84 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-controller-service-api</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.10</artifactId> + <version>0.8.2</version> + <exclusions> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-console</artifactId> + </exclusion> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-aws-common</artifactId> + </exclusion> + <exclusion> + <groupId>com.twitter</groupId> + <artifactId>finagle-core_2.10</artifactId> + </exclusion> + <exclusion> + <groupId>c3p0</groupId> + <artifactId>c3p0</artifactId> + </exclusion> + <exclusion> + <groupId>io.tesla.aether</groupId> + <artifactId>tesla-aether</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish</groupId> + <artifactId>javax.el</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.aether</groupId> + <artifactId>aether-api</artifactId> + </exclusion> + <exclusion> + <groupId>com.maxmind.geoip2</groupId> + <artifactId>geoip2</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java new file mode 100644 index 0000000..705303f --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.api; + +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; + +import com.metamx.tranquility.tranquilizer.Tranquilizer; + +public interface DruidTranquilityService extends ControllerService{ + Tranquilizer<Map<String,Object>> getTranquilizer(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml new file mode 100644 index 0000000..2714969 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml @@ -0,0 +1,95 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-controller-service</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.10</artifactId> + <version>0.8.2</version> + <exclusions> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-console</artifactId> + </exclusion> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-aws-common</artifactId> + </exclusion> + <exclusion> + <groupId>c3p0</groupId> + <artifactId>c3p0</artifactId> + </exclusion> + <exclusion> + <groupId>io.tesla.aether</groupId> + <artifactId>tesla-aether</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish</groupId> + <artifactId>javax.el</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.aether</groupId> + <artifactId>aether-api</artifactId> + </exclusion> + <exclusion> + <groupId>com.maxmind.geoip2</groupId> + <artifactId>geoip2</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java new file mode 100644 index 0000000..6d4ee19 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.api.DruidTranquilityService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.DateTime; +import org.joda.time.Period; + +import com.metamx.common.Granularity; +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.ClusteredBeamTuning; +import com.metamx.tranquility.druid.DruidBeamConfig; +import com.metamx.tranquility.druid.DruidBeams; +import com.metamx.tranquility.druid.DruidDimensions; +import com.metamx.tranquility.druid.DruidEnvironment; +import com.metamx.tranquility.druid.DruidLocation; +import com.metamx.tranquility.druid.DruidRollup; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.metamx.tranquility.typeclass.Timestamper; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; + +@Tags({"Druid", "Timeseries", "OLAP", "ingest"}) +@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task using Tranquility API. " + + "If aggregation and roll-up of data is required, an Aggregator JSON descriptor needs to be provided." + + "Details on how describe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html") +public class DruidTranquilityController extends AbstractControllerService implements DruidTranquilityService { + private String firehosePattern = "druid:firehose:%s"; + private int clusterPartitions = 1; + private int clusterReplication = 1; + private String indexRetryPeriod = "PT10M"; + + private Tranquilizer tranquilizer = null; + + public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder() + .name("druid-cs-data-source") + .displayName("Druid Data Source") + .description("Druid Data Source") //TODO description, example + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor CONNECT_STRING = new PropertyDescriptor.Builder() + .name("druid-cs-zk-connect-string") + .displayName("Zookeeper Connection String") + .description("ZK Connect String for Druid") //TODO example + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-index-service-path") + .displayName("Index Service Path") + .description("Druid Index Service path as defined via the Druid Overlord druid.service property.") + .required(true) + .defaultValue("druid/overlord") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new PropertyDescriptor.Builder() + .name("druid-cs-discovery-path") + .displayName("Discovery Path") + .description("Druid Discovery Path as configured in Druid Common druid.discovery.curator.path property") + .required(true) + .defaultValue("/druid/discovery") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder() + .name("druid-cs-timestamp-field") + .displayName("Timestamp field") + .description("The name of the field that will be used as the timestamp. Should be in ISO format.") + .required(true) + .defaultValue("timestamp") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor AGGREGATOR_JSON = new PropertyDescriptor.Builder() + .name("druid-cs-aggregators-descriptor") + .displayName("Aggregator JSON") + .description("Tranquility-compliant JSON string that defines what aggregators to apply on ingest." + + "Example: " + + "[" + + "{" + + "\t\"type\" : \"count\"," + + "\t\"name\" : \"count\"," + + "}," + + "{" + + "\t\"name\" : \"value_sum\"," + + "\t\"type\" : \"doubleSum\"," + + "\t\"fieldName\" : \"value\"" + + "}," + + "{" + + "\t\"fieldName\" : \"value\"," + + "\t\"name\" : \"value_min\"," + + "\t\"type\" : \"doubleMin\"" + + "}," + + "{" + + "\t\"type\" : \"doubleMax\"," + + "\t\"name\" : \"value_max\"," + + "\t\"fieldName\" : \"value\"" + + "}" + + "]") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor DIMENSIONS_LIST = new PropertyDescriptor.Builder() + .name("druid-cs-dimensions-list") + .displayName("Dimension Fields") + .description("A comma separated list of field names that will be stored as dimensions on ingest.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor SEGMENT_GRANULARITY = new PropertyDescriptor.Builder() + .name("druid-cs-segment-granularity") + .displayName("Segment Granularity") + .description("Time unit by which to group and aggregate/rollup events.") + .required(true) + .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.segment.granularity variable") + .defaultValue("MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY_GRANULARITY = new PropertyDescriptor.Builder() + .name("druid-cs-query-granularity") + .displayName("Query Granularity") + .description("Time unit by which to group and aggregate/rollup events. The value must be at least as large as the value of Segment Granularity.") + .required(true) + .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.query.granularity variable") + .defaultValue("TEN_MINUTE") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WINDOW_PERIOD = new PropertyDescriptor.Builder() + .name("druid-cs-window-period") + .displayName("Late Event Grace Period") + .description("Grace period to allow late arriving events for real time ingest.") + .required(true) + .allowableValues("PT1M", "PT10M", "PT60M")// TODO possibly friendly name + .defaultValue("PT10M") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("druid-cs-batch-size") + .displayName("Batch Size") + .description("Maximum number of messages to send at once.") + .required(true) + .defaultValue("2000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_PENDING_BATCHES = new PropertyDescriptor.Builder() + .name("druid-cs-max-pending-batches") + .displayName("Max Pending Batches") + .description("Maximum number of batches that may be in flight before service blocks and waits for one to finish.") + .required(true) + .defaultValue("5") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor LINGER_MILLIS = new PropertyDescriptor.Builder() + .name("druid-cs-linger-millis") + .displayName("Linger (milliseconds)") + .description("Wait this long for batches to collect more messages (up to Batch Size) before sending them. " + + "Set to zero to disable waiting. " + + "Set to -1 to always wait for complete batches before sending. ") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(DATASOURCE); + props.add(CONNECT_STRING); + props.add(DRUID_INDEX_SERVICE_PATH); + props.add(DRUID_DISCOVERY_PATH); + props.add(DIMENSIONS_LIST); + props.add(AGGREGATOR_JSON); + props.add(SEGMENT_GRANULARITY); + props.add(QUERY_GRANULARITY); + props.add(WINDOW_PERIOD); + props.add(TIMESTAMP_FIELD); + props.add(MAX_BATCH_SIZE); + props.add(MAX_PENDING_BATCHES); + props.add(LINGER_MILLIS); + + properties = Collections.unmodifiableList(props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + ComponentLog log = getLogger(); + log.info("Starting Druid Tranquility Controller Service..."); + + final String dataSource = context.getProperty(DATASOURCE).getValue(); + final String zkConnectString = context.getProperty(CONNECT_STRING).getValue(); + final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).getValue(); + final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).getValue(); + final String timestampField = context.getProperty(TIMESTAMP_FIELD).getValue(); + final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue(); + final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue(); + final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue(); + final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).getValue(); + final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue(); + final int maxBatchSize = Integer.valueOf(context.getProperty(MAX_BATCH_SIZE).getValue()); + final int maxPendingBatches = Integer.valueOf(context.getProperty(MAX_PENDING_BATCHES).getValue()); + final int lingerMillis = Integer.valueOf(context.getProperty(LINGER_MILLIS).getValue()); + + final List<String> dimensions = getDimensions(dimensionsStringList); + final List<AggregatorFactory> aggregator = getAggregatorList(aggregatorJSON); + + final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() { + private static final long serialVersionUID = 1L; + + @Override + public DateTime timestamp(Map<String, Object> theMap) { + return new DateTime(theMap.get(timestampField)); + } + }; + + Iterator<AggregatorFactory> aggIterator = aggregator.iterator(); + AggregatorFactory currFactory; + log.debug("Number of Aggregations Defined: " + aggregator.size()); + while (aggIterator.hasNext()) { + currFactory = aggIterator.next(); + log.debug("Verifying Aggregator Definition"); + log.debug("Aggregator Name: " + currFactory.getName()); + log.debug("Aggregator Type: " + currFactory.getTypeName()); + log.debug("Aggregator Req Fields: " + currFactory.requiredFields()); + } + // Tranquility uses ZooKeeper (through Curator) for coordination. + final CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString(zkConnectString) + .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) // TODO expose as properties, maybe fibonacci backoff + .build(); + curator.start(); + + // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, + // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. + final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null); + + final Beam<Map<String, Object>> beam = DruidBeams.builder(timestamper) + .curator(curator) + .discoveryPath(discoveryPath) + .location(DruidLocation.create(DruidEnvironment.create(indexService, firehosePattern), dataSource)) + .timestampSpec(timestampSpec) + .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity))) + .tuning( + ClusteredBeamTuning + .builder() + .segmentGranularity(getSegmentGranularity(segmentGranularity)) + .windowPeriod(new Period(windowPeriod)) + .partitions(clusterPartitions) + .replicants(clusterReplication) + .build() + ) + .druidBeamConfig( + DruidBeamConfig + .builder() + .indexRetryPeriod(new Period(indexRetryPeriod)) + .build()) + .buildBeam(); + + tranquilizer = Tranquilizer.builder() + .maxBatchSize(maxBatchSize) + .maxPendingBatches(maxPendingBatches) + .lingerMillis(lingerMillis) + .blockOnFull(true) + .build(beam); + + tranquilizer.start(); + } + + public Tranquilizer getTranquilizer() { + return tranquilizer; + } + + private List<Map<String, String>> parseJsonString(String aggregatorJson) { + ObjectMapper mapper = new ObjectMapper(); + List<Map<String, String>> aggSpecList = null; + try { + aggSpecList = mapper.readValue(aggregatorJson, List.class); + return aggSpecList; + } catch (IOException e) { + throw new IllegalArgumentException("Exception while parsing the aggregrator JSON"); + } + } + + private List<String> getDimensions(String dimensionStringList) { + List<String> dimensionList = new LinkedList(Arrays.asList(dimensionStringList.split(","))); + return dimensionList; + } + + private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) { + List<AggregatorFactory> aggregatorList = new LinkedList<>(); + List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON); + for (Map<String, String> aggregator : aggregatorInfo) { + + if (aggregator.get("type").equalsIgnoreCase("count")) { + aggregatorList.add(getCountAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) { + aggregatorList.add(getDoubleSumAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) { + aggregatorList.add(getDoubleMaxAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) { + aggregatorList.add(getDoubleMinAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longsum")) { + aggregatorList.add(getLongSumAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longmax")) { + aggregatorList.add(getLongMaxAggregator(aggregator)); + } else if (aggregator.get("type").equalsIgnoreCase("longmin")) { + aggregatorList.add(getLongMinAggregator(aggregator)); + } + } + + return aggregatorList; + } + + private AggregatorFactory getLongMinAggregator(Map<String, String> map) { + return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongMaxAggregator(Map<String, String> map) { + return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getLongSumAggregator(Map<String, String> map) { + return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) { + return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) { + return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) { + return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName")); + } + + private AggregatorFactory getCountAggregator(Map<String, String> map) { + return new CountAggregatorFactory(map.get("name")); + } + + private Granularity getSegmentGranularity(String segmentGranularity) { + Granularity granularity = Granularity.HOUR; + + switch (segmentGranularity) { + case "SECOND": + granularity = Granularity.SECOND; + break; + case "MINUTE": + granularity = Granularity.MINUTE; + break; + case "FIVE_MINUTE": + granularity = Granularity.FIVE_MINUTE; + break; + case "TEN_MINUTE": + granularity = Granularity.TEN_MINUTE; + break; + case "FIFTEEN_MINUTE": + granularity = Granularity.FIFTEEN_MINUTE; + break; + case "HOUR": + granularity = Granularity.HOUR; + break; + case "SIX_HOUR": + granularity = Granularity.SIX_HOUR; + break; + case "DAY": + granularity = Granularity.DAY; + break; + case "WEEK": + granularity = Granularity.WEEK; + break; + case "MONTH": + granularity = Granularity.MONTH; + break; + case "YEAR": + granularity = Granularity.YEAR; + break; + default: + break; + } + return granularity; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..53d6d06 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.controller.DruidTranquilityController \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java new file mode 100644 index 0000000..95d5e9d --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + + +public class DruidTranquilityControllerTest { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml new file mode 100644 index 0000000..746a077 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml @@ -0,0 +1,39 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-processors</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml new file mode 100644 index 0000000..6ce35f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml @@ -0,0 +1,95 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-processors</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.10</artifactId> + <version>0.8.2</version> + <exclusions> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-console</artifactId> + </exclusion> + <exclusion> + <groupId>io.druid</groupId> + <artifactId>druid-aws-common</artifactId> + </exclusion> + <exclusion> + <groupId>c3p0</groupId> + <artifactId>c3p0</artifactId> + </exclusion> + <exclusion> + <groupId>io.tesla.aether</groupId> + <artifactId>tesla-aether</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish</groupId> + <artifactId>javax.el</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.aether</groupId> + <artifactId>aether-api</artifactId> + </exclusion> + <exclusion> + <groupId>com.maxmind.geoip2</groupId> + <artifactId>geoip2</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java new file mode 100644 index 0000000..3134d5a --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; + +import org.codehaus.jackson.map.ObjectMapper; + +import org.apache.nifi.controller.api.DruidTranquilityService; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.twitter.util.Await; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; + +import scala.runtime.BoxedUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"Druid", "Timeseries", "OLAP", "ingest"}) +@CapabilityDescription("Sends events to Apache Druid for Indexing. " + + "Leverages Druid Tranquility Controller service." + + "Incoming flow files are expected to contain 1 or many JSON objects, one JSON object per line") +public class PutDruid extends AbstractSessionFactoryProcessor { + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final Map<Object, String> messageStatus = new HashMap<>(); + + public static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder() + .name("putdruid-tranquility-service") + .displayName("Tranquility Service") + .description("Tranquility Service to use for sending events to Druid") + .required(true) + .identifiesControllerService(DruidTranquilityService.class) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Success relationship") + .build(); + + public static final Relationship REL_FAIL = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship when they cannot be parsed") + .build(); + + public static final Relationship REL_DROPPED = new Relationship.Builder() + .name("dropped") + .description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...") + .build(); + + public void init(final ProcessorInitializationContext context) { + List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DRUID_TRANQUILITY_SERVICE); + this.properties = Collections.unmodifiableList(properties); + + Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_DROPPED); + relationships.add(REL_FAIL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + //Method breaks down incoming flow file and sends it to Druid Indexing service + private void processFlowFile(ProcessContext context, ProcessSession session) { + final ComponentLog log = getLogger(); + + //Get handle on Druid Tranquility session + DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE) + .asControllerService(DruidTranquilityService.class); + Tranquilizer<Map<String, Object>> tranquilizer = tranquilityController.getTranquilizer(); + + final FlowFile flowFile = session.get(); + if (flowFile == null || flowFile.getSize() == 0) { + return; + } + + //Get data from flow file body + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); + + + String contentString = new String(buffer, StandardCharsets.UTF_8); + Map<String, Object> contentMap = null; + + //Create payload array from flow file content, one element per line + String[] messageArray = contentString.split("\\R"); + + //Convert each array element from JSON to HashMap and send to Druid + for (String message : messageArray) { + try { + contentMap = new ObjectMapper().readValue(message, HashMap.class); + } catch (IOException e1) { + log.error("Error parsing incoming message array in the flowfile body"); + } + + log.debug("Tranquilizer Status: " + tranquilizer.status().toString()); + messageStatus.put(flowFile, "pending"); + //Send data element to Druid, Asynch + Future<BoxedUnit> future = tranquilizer.send(contentMap); + log.debug(" Sent Payload to Druid: " + contentMap); + + //Wait for Druid to call back with status + future.addEventListener(new FutureEventListener<Object>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof MessageDroppedException) { + //This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data) + log.error(" FlowFile Dropped due to MessageDroppedException: " + cause.getMessage() + " : " + cause); + cause.getStackTrace(); + log.error(" Transferring FlowFile to DROPPED relationship"); + session.transfer(flowFile, REL_DROPPED); + } else { + log.error("FlowFile Processing Failed due to: {} : {}", new Object[]{cause.getMessage(), cause}); + cause.printStackTrace(); + log.error(" Transferring FlowFile to FAIL relationship"); + session.transfer(flowFile, REL_FAIL); + } + } + + @Override + public void onSuccess(Object value) { + log.debug(" FlowFile Processing Success : " + value.toString()); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, "Druid Tranquility Service"); + } + }); + + try { + //Wait for result from Druid + //This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread + Await.result(future); + } catch (Exception e) { + getLogger().error("Caught exception while waiting for result of put request: " + e.getMessage()); + } + } + //session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } + + public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException { + final ProcessSession session = factory.createSession(); + //Create new Thread to ensure that waiting for callback does not reduce throughput + new Thread(() -> { + processFlowFile(context, session); + }).start(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..4136d5e --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.PutDruid \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java new file mode 100644 index 0000000..0137624 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + + +public class PutDruidTest { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml new file mode 100644 index 0000000..6b19faf --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -0,0 +1,41 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + <modules> + <module>nifi-druid-nar</module> + <module>nifi-druid-controller-service-api-nar</module> + <module>nifi-druid-controller-service-api</module> + <module>nifi-druid-controller-service</module> + <module>nifi-druid-processors</module> + </modules> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d68ee99..a396cd1 100644 --- a/pom.xml +++ b/pom.xml @@ -1494,6 +1494,18 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-controller-service-api-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-druid-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-livy-controller-service-api-nar</artifactId> <version>1.6.0-SNAPSHOT</version> <type>nar</type>
