Repository: nifi Updated Branches: refs/heads/HDF-3.1-maint [created] 29b82727a
http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java new file mode 100644 index 0000000..28e084c --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/processors/druid/PutDruidRecordTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.druid; + + +import org.apache.nifi.controller.druid.DruidTranquilityController; +import org.apache.nifi.controller.druid.MockDruidTranquilityController; +import org.apache.nifi.controller.api.druid.DruidTranquilityService; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class PutDruidRecordTest { + + private TestRunner runner; + private DruidTranquilityService druidTranquilityController; + private MockRecordParser recordReader; + private MockRecordWriter recordWriter; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutDruidRecord.class); + druidTranquilityController = new MockDruidTranquilityController(2,3); + recordReader = new MockRecordParser(); + recordWriter = new MockRecordWriter(null, true, 2); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.addControllerService("writer", recordWriter); + runner.enableControllerService(recordWriter); + + runner.addControllerService("tranquility", druidTranquilityController); + runner.setProperty(druidTranquilityController, DruidTranquilityController.DATASOURCE, "test"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]"); + runner.setProperty(druidTranquilityController, DruidTranquilityController.DIMENSIONS_LIST, "dim1,dim2"); + runner.assertValid(druidTranquilityController); + runner.enableControllerService(druidTranquilityController); + + runner.setProperty(PutDruidRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(PutDruidRecord.RECORD_WRITER_FACTORY, "writer"); + runner.setProperty(PutDruidRecord.DRUID_TRANQUILITY_SERVICE, "tranquility"); + } + + @Test + public void testEmptyRecord() throws Exception { + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1); + runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 0); + runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "0"); + } + + @Test + public void testPutRecords() throws Exception { + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "Soccer"); + recordReader.addRecord("Jane Doe", 47, "Tennis"); + recordReader.addRecord("Sally Doe", 47, "Curling"); // Will be dropped due to the "drop after 2" parameter on the MockDruidTranquilityController + recordReader.addRecord("Jimmy Doe", 14, null); // Will fail due to the "fail after 3" parameter on the MockDruidTranquilityController + recordReader.addRecord("Pizza Doe", 14, null); // Will fail due to the "fail after 3" parameter on the MockDruidTranquilityController + recordReader.addRecord("Bad Record", "X", 13); // RecordWriter fail due to the "fail after 2" parameter on the MockRecordWriter, not written to output + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(PutDruidRecord.REL_SUCCESS, 1); + runner.assertTransferCount(PutDruidRecord.REL_FAILURE, 1); + runner.assertTransferCount(PutDruidRecord.REL_DROPPED, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2"); + flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_DROPPED).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "1"); + flowFile = runner.getFlowFilesForRelationship(PutDruidRecord.REL_FAILURE).get(0); + flowFile.assertAttributeEquals(PutDruidRecord.RECORD_COUNT, "2"); + + // Assert a single SEND event present for the successful flow file + assertEquals(1, runner.getProvenanceEvents().stream().filter((e) -> ProvenanceEventType.SEND.equals(e.getEventType())).count()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/nifi-druid-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml new file mode 100644 index 0000000..82d3017 --- /dev/null +++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml @@ -0,0 +1,56 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-druid-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <properties> + <druid.version>0.9.1</druid.version> + <tranquility.version>0.8.2</tranquility.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.11</artifactId> + <version>${tranquility.version}</version> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>druid-processing</artifactId> + <version>${druid.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <modules> + <module>nifi-druid-nar</module> + <module>nifi-druid-controller-service-api-nar</module> + <module>nifi-druid-controller-service-api</module> + <module>nifi-druid-controller-service</module> + <module>nifi-druid-processors</module> + </modules> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/45edd12a/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 70a5e3c..0812c67 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -91,6 +91,7 @@ <module>nifi-metrics-reporting-bundle</module> <module>nifi-spark-bundle</module> <module>nifi-atlas-bundle</module> + <module>nifi-druid-bundle</module> </modules> <build>