Repository: nifi
Updated Branches:
  refs/heads/HDF-3.1-maint [created] 29b82727a


http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java
 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java
new file mode 100644
index 0000000..28e084c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.druid;
+
+
+import org.apache.nifi.controller.druid.DruidTranquilityController;
+import org.apache.nifi.controller.druid.MockDruidTranquilityController;
+import org.apache.nifi.controller.api.druid.DruidTranquilityService;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class PutDruidRecordTest {
+
+    private TestRunner runner;
+    private DruidTranquilityService druidTranquilityController;
+    private MockRecordParser recordReader;
+    private MockRecordWriter recordWriter;
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutDruidRecord.class);
+        druidTranquilityController = new MockDruidTranquilityController(2,3);
+        recordReader = new MockRecordParser();
+        recordWriter = new MockRecordWriter(null, true, 2);
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+
+        runner.addControllerService("tranquility", druidTranquilityController);
+        runner.setProperty(druidTranquilityController, 
DruidTranquilityController.DATASOURCE, "test");
+        runner.setProperty(druidTranquilityController, 
DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+        runner.setProperty(druidTranquilityController, 
DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": 
\"count\"}]");
+        runner.setProperty(druidTranquilityController, 
DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2");
+        runner.assertValid(druidTranquilityController);
+        runner.enableControllerService(druidTranquilityController);
+
+        runner.setProperty(PutDruidRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(PutDruidRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(PutDruidRecord.DRUID_TRANQUILITY_SERVICE, 
"tranquility");
+    }
+
+    @Test
+    public void testEmptyRecord() throws Exception {
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 0);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "0");
+    }
+
+    @Test
+    public void testPutRecords() throws Exception {
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling"); // Will be dropped 
due to the "drop after 2" parameter on the MockDruidTranquilityController
+        recordReader.addRecord("Jimmy Doe", 14, null); // Will fail due to the 
"fail after 3" parameter on the MockDruidTranquilityController
+        recordReader.addRecord("Pizza Doe", 14, null); // Will fail due to the 
"fail after 3" parameter on the MockDruidTranquilityController
+        recordReader.addRecord("Bad Record", "X", 13); // RecordWriter fail 
due to the "fail after 2" parameter on the MockRecordWriter, not written to 
output
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 1);
+        runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2");
+        flowFile = 
runner.getFlowFilesForRelationship(PutDruidRecord.REL_DROPPED).get(0);
+        flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "1");
+        flowFile = 
runner.getFlowFilesForRelationship(PutDruidRecord.REL_FAILURE).get(0);
+        flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2");
+
+        // Assert a single SEND event present for the successful flow file
+        assertEquals(1, runner.getProvenanceEvents().stream().filter((e) -> 
ProvenanceEventType.SEND.equals(e.getEventType())).count());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml 
b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
new file mode 100644
index 0000000..82d3017
--- /dev/null
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -0,0 +1,56 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-druid-bundle</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+        <druid.version>0.9.1</druid.version>
+        <tranquility.version>0.8.2</tranquility.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>io.druid</groupId>
+                <artifactId>tranquility-core_2.11</artifactId>
+                <version>${tranquility.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.druid</groupId>
+                <artifactId>druid-processing</artifactId>
+                <version>${druid.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <modules>
+        <module>nifi-druid-nar</module>
+        <module>nifi-druid-controller-service-api-nar</module>
+        <module>nifi-druid-controller-service-api</module>
+        <module>nifi-druid-controller-service</module>
+        <module>nifi-druid-processors</module>
+    </modules>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 70a5e3c..0812c67 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -91,6 +91,7 @@
         <module>nifi-metrics-reporting-bundle</module>
         <module>nifi-spark-bundle</module>
         <module>nifi-atlas-bundle</module>
+        <module>nifi-druid-bundle</module>
   </modules>
 
     <build>

Reply via email to