Repository: nifi
Updated Branches:
  refs/heads/master d93d53817 -> 7e2910399


NIFI-4428: - Implement PutDruid Processor and Controller

update

added provenance report

added parameters for batch control

WIP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7fa0a34a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7fa0a34a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7fa0a34a

Branch: refs/heads/master
Commit: 7fa0a34aba23ea390720a8f41aefbe59f26b15c9
Parents: d93d538
Author: vvaks <[email protected]>
Authored: Wed Sep 27 13:23:23 2017 -0400
Committer: joewitt <[email protected]>
Committed: Tue Jan 16 13:07:56 2018 -0500

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  10 +
 .../pom.xml                                     |  35 ++
 .../nifi-druid-controller-service-api/pom.xml   |  84 ++++
 .../controller/api/DruidTranquilityService.java |  28 ++
 .../nifi-druid-controller-service/pom.xml       |  95 ++++
 .../controller/DruidTranquilityController.java  | 452 +++++++++++++++++++
 ...org.apache.nifi.controller.ControllerService |  16 +
 .../DruidTranquilityControllerTest.java         |  22 +
 .../nifi-druid-bundle/nifi-druid-nar/pom.xml    |  39 ++
 .../nifi-druid-processors/pom.xml               |  95 ++++
 .../org/apache/nifi/processors/PutDruid.java    | 196 ++++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../apache/nifi/processors/PutDruidTest.java    |  22 +
 nifi-nar-bundles/nifi-druid-bundle/pom.xml      |  41 ++
 pom.xml                                         |  12 +
 15 files changed, 1163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 49ef70a..d576261 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -561,6 +561,16 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-livy-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-druid-controller-service-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-druid-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml
new file mode 100644
index 0000000..6f4f606
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api-nar/pom.xml
@@ -0,0 +1,35 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-druid-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-druid-controller-service-api-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-druid-controller-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml
new file mode 100644
index 0000000..97eac38
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/pom.xml
@@ -0,0 +1,84 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+       <groupId>org.apache.nifi</groupId>
+       <artifactId>nifi-druid-bundle</artifactId>
+       <version>1.5.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>nifi-druid-controller-service-api</artifactId>
+  
+   <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+       <dependency>
+           <groupId>io.druid</groupId>
+           <artifactId>tranquility-core_2.10</artifactId>
+           <version>0.8.2</version>
+           <exclusions>
+               <exclusion>
+                   <groupId>io.druid</groupId>
+                   <artifactId>druid-console</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>io.druid</groupId>
+                   <artifactId>druid-aws-common</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>com.twitter</groupId>
+                   <artifactId>finagle-core_2.10</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>c3p0</groupId>
+                   <artifactId>c3p0</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>io.tesla.aether</groupId>
+                   <artifactId>tesla-aether</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>org.glassfish</groupId>
+                   <artifactId>javax.el</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>com.sun.jersey</groupId>
+                   <artifactId>*</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>org.eclipse.jetty</groupId>
+                   <artifactId>*</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>org.eclipse.aether</groupId>
+                   <artifactId>aether-api</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>com.maxmind.geoip2</groupId>
+                   <artifactId>geoip2</artifactId>
+               </exclusion>
+               <exclusion>
+                   <groupId>net.java.dev.jets3t</groupId>
+                   <artifactId>jets3t</artifactId>
+               </exclusion>
+           </exclusions>
+       </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java
new file mode 100644
index 0000000..705303f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service-api/src/main/java/org/apache/nifi/controller/api/DruidTranquilityService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.api;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+
+public interface DruidTranquilityService extends ControllerService{
+    Tranquilizer<Map<String,Object>> getTranquilizer();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
new file mode 100644
index 0000000..2714969
--- /dev/null
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
@@ -0,0 +1,95 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-druid-bundle</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-druid-controller-service</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-druid-controller-service-api</artifactId>
+            <version>1.5.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>io.druid</groupId>
+            <artifactId>tranquility-core_2.10</artifactId>
+            <version>0.8.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.druid</groupId>
+                    <artifactId>druid-console</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.druid</groupId>
+                    <artifactId>druid-aws-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>c3p0</groupId>
+                    <artifactId>c3p0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.tesla.aether</groupId>
+                    <artifactId>tesla-aether</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.glassfish</groupId>
+                    <artifactId>javax.el</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.aether</groupId>
+                    <artifactId>aether-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.maxmind.geoip2</groupId>
+                    <artifactId>geoip2</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.java.dev.jets3t</groupId>
+                    <artifactId>jets3t</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java
new file mode 100644
index 0000000..6d4ee19
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/DruidTranquilityController.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.api.DruidTranquilityService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import com.metamx.common.Granularity;
+import com.metamx.tranquility.beam.Beam;
+import com.metamx.tranquility.beam.ClusteredBeamTuning;
+import com.metamx.tranquility.druid.DruidBeamConfig;
+import com.metamx.tranquility.druid.DruidBeams;
+import com.metamx.tranquility.druid.DruidDimensions;
+import com.metamx.tranquility.druid.DruidEnvironment;
+import com.metamx.tranquility.druid.DruidLocation;
+import com.metamx.tranquility.druid.DruidRollup;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.metamx.tranquility.typeclass.Timestamper;
+
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
+import io.druid.query.aggregation.DoubleMinAggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongMaxAggregatorFactory;
+import io.druid.query.aggregation.LongMinAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+
+@Tags({"Druid", "Timeseries", "OLAP", "ingest"})
+@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task 
using Tranquility API. "
+        + "If aggregation and roll-up of data is required, an Aggregator JSON 
descriptor needs to be provided."
+        + "Details on how describe aggregation using JSON can be found at: 
http://druid.io/docs/latest/querying/aggregations.html";)
+public class DruidTranquilityController extends AbstractControllerService 
implements DruidTranquilityService {
+    private String firehosePattern = "druid:firehose:%s";
+    private int clusterPartitions = 1;
+    private int clusterReplication = 1;
+    private String indexRetryPeriod = "PT10M";
+
+    private Tranquilizer tranquilizer = null;
+
+    public static final PropertyDescriptor DATASOURCE = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-data-source")
+            .displayName("Druid Data Source")
+            .description("Druid Data Source") //TODO description, example
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor CONNECT_STRING = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-zk-connect-string")
+            .displayName("Zookeeper Connection String")
+            .description("ZK Connect String for Druid") //TODO example
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-index-service-path")
+            .displayName("Index Service Path")
+            .description("Druid Index Service path as defined via the Druid 
Overlord druid.service property.")
+            .required(true)
+            .defaultValue("druid/overlord")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-discovery-path")
+            .displayName("Discovery Path")
+            .description("Druid Discovery Path as configured in Druid Common 
druid.discovery.curator.path property")
+            .required(true)
+            .defaultValue("/druid/discovery")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor TIMESTAMP_FIELD = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-timestamp-field")
+            .displayName("Timestamp field")
+            .description("The name of the field that will be used as the 
timestamp. Should be in ISO format.")
+            .required(true)
+            .defaultValue("timestamp")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor AGGREGATOR_JSON = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-aggregators-descriptor")
+            .displayName("Aggregator JSON")
+            .description("Tranquility-compliant JSON string that defines what 
aggregators to apply on ingest."
+                    + "Example: "
+                    + "["
+                    + "{"
+                    + "\t\"type\" : \"count\","
+                    + "\t\"name\" : \"count\","
+                    + "},"
+                    + "{"
+                    + "\t\"name\" : \"value_sum\","
+                    + "\t\"type\" : \"doubleSum\","
+                    + "\t\"fieldName\" : \"value\""
+                    + "},"
+                    + "{"
+                    + "\t\"fieldName\" : \"value\","
+                    + "\t\"name\" : \"value_min\","
+                    + "\t\"type\" : \"doubleMin\""
+                    + "},"
+                    + "{"
+                    + "\t\"type\" : \"doubleMax\","
+                    + "\t\"name\" : \"value_max\","
+                    + "\t\"fieldName\" : \"value\""
+                    + "}"
+                    + "]")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DIMENSIONS_LIST = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-dimensions-list")
+            .displayName("Dimension Fields")
+            .description("A comma separated list of field names that will be 
stored as dimensions on ingest.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor SEGMENT_GRANULARITY = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-segment-granularity")
+            .displayName("Segment Granularity")
+            .description("Time unit by which to group and aggregate/rollup 
events.")
+            .required(true)
+            .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", 
"DAY", "MONTH", "YEAR", "Use druid.segment.granularity variable")
+            .defaultValue("MINUTE")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor QUERY_GRANULARITY = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-query-granularity")
+            .displayName("Query Granularity")
+            .description("Time unit by which to group and aggregate/rollup 
events. The value must be at least as large as the value of Segment 
Granularity.")
+            .required(true)
+            .allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", 
"DAY", "MONTH", "YEAR", "Use druid.query.granularity variable")
+            .defaultValue("TEN_MINUTE")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WINDOW_PERIOD = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-window-period")
+            .displayName("Late Event Grace Period")
+            .description("Grace period to allow late arriving events for real 
time ingest.")
+            .required(true)
+            .allowableValues("PT1M", "PT10M", "PT60M")// TODO possibly 
friendly name
+            .defaultValue("PT10M")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-batch-size")
+            .displayName("Batch Size")
+            .description("Maximum number of messages to send at once.")
+            .required(true)
+            .defaultValue("2000")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_PENDING_BATCHES = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-max-pending-batches")
+            .displayName("Max Pending Batches")
+            .description("Maximum number of batches that may be in flight 
before service blocks and waits for one to finish.")
+            .required(true)
+            .defaultValue("5")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor LINGER_MILLIS = new 
PropertyDescriptor.Builder()
+            .name("druid-cs-linger-millis")
+            .displayName("Linger (milliseconds)")
+            .description("Wait this long for batches to collect more messages 
(up to Batch Size) before sending them. "
+                    + "Set to zero to disable waiting. "
+                    + "Set to -1 to always wait for complete batches before 
sending. ")
+            .required(true)
+            .defaultValue("1000")
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(DATASOURCE);
+        props.add(CONNECT_STRING);
+        props.add(DRUID_INDEX_SERVICE_PATH);
+        props.add(DRUID_DISCOVERY_PATH);
+        props.add(DIMENSIONS_LIST);
+        props.add(AGGREGATOR_JSON);
+        props.add(SEGMENT_GRANULARITY);
+        props.add(QUERY_GRANULARITY);
+        props.add(WINDOW_PERIOD);
+        props.add(TIMESTAMP_FIELD);
+        props.add(MAX_BATCH_SIZE);
+        props.add(MAX_PENDING_BATCHES);
+        props.add(LINGER_MILLIS);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException {
+        ComponentLog log = getLogger();
+        log.info("Starting Druid Tranquility Controller Service...");
+
+        final String dataSource = context.getProperty(DATASOURCE).getValue();
+        final String zkConnectString = 
context.getProperty(CONNECT_STRING).getValue();
+        final String indexService = 
context.getProperty(DRUID_INDEX_SERVICE_PATH).getValue();
+        final String discoveryPath = 
context.getProperty(DRUID_DISCOVERY_PATH).getValue();
+        final String timestampField = 
context.getProperty(TIMESTAMP_FIELD).getValue();
+        final String segmentGranularity = 
context.getProperty(SEGMENT_GRANULARITY).getValue();
+        final String queryGranularity = 
context.getProperty(QUERY_GRANULARITY).getValue();
+        final String windowPeriod = 
context.getProperty(WINDOW_PERIOD).getValue();
+        final String aggregatorJSON = 
context.getProperty(AGGREGATOR_JSON).getValue();
+        final String dimensionsStringList = 
context.getProperty(DIMENSIONS_LIST).getValue();
+        final int maxBatchSize = 
Integer.valueOf(context.getProperty(MAX_BATCH_SIZE).getValue());
+        final int maxPendingBatches = 
Integer.valueOf(context.getProperty(MAX_PENDING_BATCHES).getValue());
+        final int lingerMillis = 
Integer.valueOf(context.getProperty(LINGER_MILLIS).getValue());
+
+        final List<String> dimensions = getDimensions(dimensionsStringList);
+        final List<AggregatorFactory> aggregator = 
getAggregatorList(aggregatorJSON);
+
+        final Timestamper<Map<String, Object>> timestamper = new 
Timestamper<Map<String, Object>>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public DateTime timestamp(Map<String, Object> theMap) {
+                return new DateTime(theMap.get(timestampField));
+            }
+        };
+
+        Iterator<AggregatorFactory> aggIterator = aggregator.iterator();
+        AggregatorFactory currFactory;
+        log.debug("Number of Aggregations Defined: " + aggregator.size());
+        while (aggIterator.hasNext()) {
+            currFactory = aggIterator.next();
+            log.debug("Verifying Aggregator Definition");
+            log.debug("Aggregator Name: " + currFactory.getName());
+            log.debug("Aggregator Type: " + currFactory.getTypeName());
+            log.debug("Aggregator Req Fields: " + 
currFactory.requiredFields());
+        }
+        // Tranquility uses ZooKeeper (through Curator) for coordination.
+        final CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString(zkConnectString)
+                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) // 
TODO expose as properties, maybe fibonacci backoff
+                .build();
+        curator.start();
+
+        // The JSON serialization of your object must have a timestamp field 
in a format that Druid understands. By default,
+        // Druid expects the field to be called "timestamp" and to be an 
ISO8601 timestamp.
+        final TimestampSpec timestampSpec = new TimestampSpec(timestampField, 
"auto", null);
+
+        final Beam<Map<String, Object>> beam = DruidBeams.builder(timestamper)
+                .curator(curator)
+                .discoveryPath(discoveryPath)
+                
.location(DruidLocation.create(DruidEnvironment.create(indexService, 
firehosePattern), dataSource))
+                .timestampSpec(timestampSpec)
+                
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, 
QueryGranularity.fromString(queryGranularity)))
+                .tuning(
+                        ClusteredBeamTuning
+                                .builder()
+                                
.segmentGranularity(getSegmentGranularity(segmentGranularity))
+                                .windowPeriod(new Period(windowPeriod))
+                                .partitions(clusterPartitions)
+                                .replicants(clusterReplication)
+                                .build()
+                )
+                .druidBeamConfig(
+                        DruidBeamConfig
+                                .builder()
+                                .indexRetryPeriod(new Period(indexRetryPeriod))
+                                .build())
+                .buildBeam();
+
+        tranquilizer = Tranquilizer.builder()
+                .maxBatchSize(maxBatchSize)
+                .maxPendingBatches(maxPendingBatches)
+                .lingerMillis(lingerMillis)
+                .blockOnFull(true)
+                .build(beam);
+
+        tranquilizer.start();
+    }
+
+    public Tranquilizer getTranquilizer() {
+        return tranquilizer;
+    }
+
+    private List<Map<String, String>> parseJsonString(String aggregatorJson) {
+        ObjectMapper mapper = new ObjectMapper();
+        List<Map<String, String>> aggSpecList = null;
+        try {
+            aggSpecList = mapper.readValue(aggregatorJson, List.class);
+            return aggSpecList;
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Exception while parsing the 
aggregrator JSON");
+        }
+    }
+
+    private List<String> getDimensions(String dimensionStringList) {
+        List<String> dimensionList = new 
LinkedList(Arrays.asList(dimensionStringList.split(",")));
+        return dimensionList;
+    }
+
+    private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
+        List<AggregatorFactory> aggregatorList = new LinkedList<>();
+        List<Map<String, String>> aggregatorInfo = 
parseJsonString(aggregatorJSON);
+        for (Map<String, String> aggregator : aggregatorInfo) {
+
+            if (aggregator.get("type").equalsIgnoreCase("count")) {
+                aggregatorList.add(getCountAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) {
+                aggregatorList.add(getDoubleSumAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) {
+                aggregatorList.add(getDoubleMaxAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) {
+                aggregatorList.add(getDoubleMinAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("longsum")) {
+                aggregatorList.add(getLongSumAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("longmax")) {
+                aggregatorList.add(getLongMaxAggregator(aggregator));
+            } else if (aggregator.get("type").equalsIgnoreCase("longmin")) {
+                aggregatorList.add(getLongMinAggregator(aggregator));
+            }
+        }
+
+        return aggregatorList;
+    }
+
+    private AggregatorFactory getLongMinAggregator(Map<String, String> map) {
+        return new LongMinAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getLongMaxAggregator(Map<String, String> map) {
+        return new LongMaxAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getLongSumAggregator(Map<String, String> map) {
+        return new LongSumAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) {
+        return new DoubleMinAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) {
+        return new DoubleMaxAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) {
+        return new DoubleSumAggregatorFactory(map.get("name"), 
map.get("fieldName"));
+    }
+
+    private AggregatorFactory getCountAggregator(Map<String, String> map) {
+        return new CountAggregatorFactory(map.get("name"));
+    }
+
+    private Granularity getSegmentGranularity(String segmentGranularity) {
+        Granularity granularity = Granularity.HOUR;
+
+        switch (segmentGranularity) {
+            case "SECOND":
+                granularity = Granularity.SECOND;
+                break;
+            case "MINUTE":
+                granularity = Granularity.MINUTE;
+                break;
+            case "FIVE_MINUTE":
+                granularity = Granularity.FIVE_MINUTE;
+                break;
+            case "TEN_MINUTE":
+                granularity = Granularity.TEN_MINUTE;
+                break;
+            case "FIFTEEN_MINUTE":
+                granularity = Granularity.FIFTEEN_MINUTE;
+                break;
+            case "HOUR":
+                granularity = Granularity.HOUR;
+                break;
+            case "SIX_HOUR":
+                granularity = Granularity.SIX_HOUR;
+                break;
+            case "DAY":
+                granularity = Granularity.DAY;
+                break;
+            case "WEEK":
+                granularity = Granularity.WEEK;
+                break;
+            case "MONTH":
+                granularity = Granularity.MONTH;
+                break;
+            case "YEAR":
+                granularity = Granularity.YEAR;
+                break;
+            default:
+                break;
+        }
+        return granularity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..53d6d06
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.controller.DruidTranquilityController
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java
new file mode 100644
index 0000000..95d5e9d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/test/java/org/apache/nifi/controller/DruidTranquilityControllerTest.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+
+public class DruidTranquilityControllerTest {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml
new file mode 100644
index 0000000..746a077
--- /dev/null
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-nar/pom.xml
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+       <groupId>org.apache.nifi</groupId>
+       <artifactId>nifi-druid-bundle</artifactId>
+       <version>1.5.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>nifi-druid-nar</artifactId>
+  <packaging>nar</packaging>
+  
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-druid-controller-service</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-druid-processors</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+       </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
new file mode 100644
index 0000000..6ce35f6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/pom.xml
@@ -0,0 +1,95 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+       <groupId>org.apache.nifi</groupId>
+       <artifactId>nifi-druid-bundle</artifactId>
+       <version>1.5.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>nifi-druid-processors</artifactId> 
+  
+   <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+               <groupId>org.apache.nifi</groupId>
+                       
<artifactId>nifi-druid-controller-service-api</artifactId>
+                       <version>1.5.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+               <groupId>io.druid</groupId>
+               <artifactId>tranquility-core_2.10</artifactId>
+               <version>0.8.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.druid</groupId>
+                    <artifactId>druid-console</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.druid</groupId>
+                    <artifactId>druid-aws-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>c3p0</groupId>
+                    <artifactId>c3p0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.tesla.aether</groupId>
+                    <artifactId>tesla-aether</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.glassfish</groupId>
+                    <artifactId>javax.el</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.aether</groupId>
+                    <artifactId>aether-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.maxmind.geoip2</groupId>
+                    <artifactId>geoip2</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.java.dev.jets3t</groupId>
+                    <artifactId>jets3t</artifactId>
+                </exclusion>
+            </exclusions>
+               </dependency>
+       </dependencies> 
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java
new file mode 100644
index 0000000..3134d5a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/PutDruid.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.nifi.controller.api.DruidTranquilityService;
+import com.metamx.tranquility.tranquilizer.MessageDroppedException;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+
+import scala.runtime.BoxedUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"Druid", "Timeseries", "OLAP", "ingest"})
+@CapabilityDescription("Sends events to Apache Druid for Indexing. "
+        + "Leverages Druid Tranquility Controller service."
+        + "Incoming flow files are expected to contain 1 or many JSON objects, 
one JSON object per line")
+public class PutDruid extends AbstractSessionFactoryProcessor {
+
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private final Map<Object, String> messageStatus = new HashMap<>();
+
+    public static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("putdruid-tranquility-service")
+            .displayName("Tranquility Service")
+            .description("Tranquility Service to use for sending events to 
Druid")
+            .required(true)
+            .identifiesControllerService(DruidTranquilityService.class)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Success relationship")
+            .build();
+
+    public static final Relationship REL_FAIL = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles are routed to this relationship when they 
cannot be parsed")
+            .build();
+
+    public static final Relationship REL_DROPPED = new Relationship.Builder()
+            .name("dropped")
+            .description("FlowFiles are routed to this relationship when they 
are outside of the configured time window, timestamp format is invalid, ect...")
+            .build();
+
+    public void init(final ProcessorInitializationContext context) {
+        List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DRUID_TRANQUILITY_SERVICE);
+        this.properties = Collections.unmodifiableList(properties);
+
+        Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_DROPPED);
+        relationships.add(REL_FAIL);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    //Method breaks down incoming flow file and sends it to Druid Indexing 
service
+    private void processFlowFile(ProcessContext context, ProcessSession 
session) {
+        final ComponentLog log = getLogger();
+
+        //Get handle on Druid Tranquility session
+        DruidTranquilityService tranquilityController = 
context.getProperty(DRUID_TRANQUILITY_SERVICE)
+                .asControllerService(DruidTranquilityService.class);
+        Tranquilizer<Map<String, Object>> tranquilizer = 
tranquilityController.getTranquilizer();
+
+        final FlowFile flowFile = session.get();
+        if (flowFile == null || flowFile.getSize() == 0) {
+            return;
+        }
+
+        //Get data from flow file body
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+
+
+        String contentString = new String(buffer, StandardCharsets.UTF_8);
+        Map<String, Object> contentMap = null;
+
+        //Create payload array from flow file content, one element per line
+        String[] messageArray = contentString.split("\\R");
+
+        //Convert each array element from JSON to HashMap and send to Druid
+        for (String message : messageArray) {
+            try {
+                contentMap = new ObjectMapper().readValue(message, 
HashMap.class);
+            } catch (IOException e1) {
+                log.error("Error parsing incoming message array in the 
flowfile body");
+            }
+
+            log.debug("Tranquilizer Status: " + 
tranquilizer.status().toString());
+            messageStatus.put(flowFile, "pending");
+            //Send data element to Druid, Asynch
+            Future<BoxedUnit> future = tranquilizer.send(contentMap);
+            log.debug(" Sent Payload to Druid: " + contentMap);
+
+            //Wait for Druid to call back with status
+            future.addEventListener(new FutureEventListener<Object>() {
+                @Override
+                public void onFailure(Throwable cause) {
+                    if (cause instanceof MessageDroppedException) {
+                        //This happens when event timestamp targets a Druid 
Indexing task that has closed (Late Arriving Data)
+                        log.error(" FlowFile Dropped due to 
MessageDroppedException: " + cause.getMessage() + " : " + cause);
+                        cause.getStackTrace();
+                        log.error(" Transferring FlowFile to DROPPED 
relationship");
+                        session.transfer(flowFile, REL_DROPPED);
+                    } else {
+                        log.error("FlowFile Processing Failed due to: {} : 
{}", new Object[]{cause.getMessage(), cause});
+                        cause.printStackTrace();
+                        log.error(" Transferring FlowFile to FAIL 
relationship");
+                        session.transfer(flowFile, REL_FAIL);
+                    }
+                }
+
+                @Override
+                public void onSuccess(Object value) {
+                    log.debug(" FlowFile Processing Success : " + 
value.toString());
+                    session.transfer(flowFile, REL_SUCCESS);
+                    session.getProvenanceReporter().send(flowFile, "Druid 
Tranquility Service");
+                }
+            });
+
+            try {
+                //Wait for result from Druid
+                //This method will be asynch since this is a 
SessionFactoryProcessor and OnTrigger will create a new Thread
+                Await.result(future);
+            } catch (Exception e) {
+                getLogger().error("Caught exception while waiting for result 
of put request: " + e.getMessage());
+            }
+        }
+        //session.transfer(flowFile, REL_SUCCESS);
+        session.commit();
+    }
+
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
factory) throws ProcessException {
+        final ProcessSession session = factory.createSession();
+        //Create new Thread to ensure that waiting for callback does not 
reduce throughput
+        new Thread(() -> {
+            processFlowFile(context, session);
+        }).start();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..4136d5e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.processors.PutDruid
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java
new file mode 100644
index 0000000..0137624
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/PutDruidTest.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+
+public class PutDruidTest {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
new file mode 100644
index 0000000..6b19faf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -0,0 +1,41 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-druid-bundle</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+    <modules>
+        <module>nifi-druid-nar</module>
+        <module>nifi-druid-controller-service-api-nar</module>
+        <module>nifi-druid-controller-service-api</module>
+        <module>nifi-druid-controller-service</module>
+        <module>nifi-druid-processors</module>
+    </modules>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/7fa0a34a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d68ee99..a396cd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1494,6 +1494,18 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-druid-controller-service-api-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-druid-nar</artifactId>
+                <version>1.5.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+               <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-livy-controller-service-api-nar</artifactId>
                 <version>1.6.0-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to