Repository: nifi
Updated Branches:
  refs/heads/master 0eda71a9a -> e3da44fb6


[NiFi-3973] Add PutKudu processor to ingest data to Kudu


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3da44fb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3da44fb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3da44fb

Branch: refs/heads/master
Commit: e3da44fb626a495f18ead72955a0854d6b1f7151
Parents: 0eda71a
Author: cam <[email protected]>
Authored: Tue Jun 6 18:25:41 2017 -0700
Committer: ricky <[email protected]>
Committed: Fri Aug 25 10:53:31 2017 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-kudu-bundle/nifi-kudu-nar/pom.xml      |  42 +++
 .../src/main/resources/META-INF/NOTICE          |  51 ++++
 .../nifi-kudu-processors/pom.xml                |  91 ++++++
 .../nifi/processors/kudu/AbstractKudu.java      | 236 ++++++++++++++++
 .../nifi/processors/kudu/OperationType.java     |  24 ++
 .../apache/nifi/processors/kudu/PutKudu.java    | 136 +++++++++
 .../org.apache.nifi.processor.Processor         |  16 ++
 .../nifi/processors/kudu/MockPutKudu.java       |  58 ++++
 .../nifi/processors/kudu/TestPutKudu.java       | 275 +++++++++++++++++++
 nifi-nar-bundles/nifi-kudu-bundle/pom.xml       |  36 +++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  12 +
 13 files changed, 983 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index b875f1c..5b10e0c 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -218,6 +218,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kudu-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flume-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
new file mode 100644
index 0000000..4d2b98f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-kudu-bundle</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-kudu-nar</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kudu-processors</artifactId>
+            <version>1.4.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-libraries-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..34f4467
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,51 @@
+nifi-kudu-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2017 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache Commons JEXL
+    The following NOTICE information applies:
+      Apache Commons JEXL
+      Copyright 2001-2011 The Apache Software Foundation
+
+  (ASLv2) Kudu SDK
+    The following NOTICE information applies:
+      This product includes software developed by Cloudera, Inc.
+      (http://www.cloudera.com/).
+
+      This product includes software developed at
+      The Apache Software Foundation (http://www.apache.org/).
+
+      This product includes software developed by
+      Saxonica (http://www.saxonica.com/).
+
+  (ASLv2) Parquet MR
+    The following NOTICE information applies:
+      Parquet MR
+      Copyright 2012 Twitter, Inc.
+
+      This project includes code from https://github.com/lemire/JavaFastPFOR
+      
parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
+      Apache License Version 2.0 http://www.apache.org/licenses/.
+      (c) Daniel Lemire, http://lemire.me/en/
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
new file mode 100644
index 0000000..7e6c2d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -0,0 +1,91 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-kudu-bundle</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-kudu-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <version>1.4.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-hadoop-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.5.4</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
new file mode 100644
index 0000000..5019e03
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+    protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+            .name("Kudu Masters")
+            .description("List all kudu masters's ip with port (e.g. 7051), 
comma separated")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the Kudu Table to put data into")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The service for reading records from incoming flow 
files.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+            .name("Skip head line")
+            .description("Set it to true if your first line is the header line 
e.g. column names")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+            .name("Insert Operation")
+            .description("Specify operationType for this processor. 
Insert-Ignore will ignore duplicated rows")
+            .allowableValues(OperationType.values())
+            .defaultValue(OperationType.INSERT.toString())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it 
has been successfully stored in Kudu")
+            .build();
+    protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it 
cannot be sent to Kudu")
+            .build();
+
+    public static final String RECORD_COUNT_ATTR = "record.count";
+
+    protected String kuduMasters;
+    protected String tableName;
+    protected boolean skipHeadLine;
+    protected OperationType operationType;
+
+    protected KuduClient kuduClient;
+    protected KuduTable kuduTable;
+
+    @OnScheduled
+    public void OnScheduled(final ProcessContext context) {
+        try {
+            tableName = context.getProperty(TABLE_NAME).getValue();
+            kuduMasters = context.getProperty(KUDU_MASTERS).getValue();
+            if(kuduClient == null) {
+                getLogger().debug("Setting up Kudu connection...");
+                kuduClient = getKuduConnection(kuduMasters);
+                kuduTable = this.getKuduTable(kuduClient, tableName);
+                getLogger().debug("Kudu connection successfully initialized");
+            }
+        } catch(KuduException ex){
+            getLogger().error("Exception occurred while interacting with Kudu 
due to " + ex.getMessage(), ex);
+        }
+        operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
+        skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
+    }
+
+    @OnStopped
+    public final void closeClient() throws KuduException {
+        if (kuduClient != null) {
+            getLogger().info("Closing KuduClient");
+            kuduClient.close();
+            kuduClient = null;
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        try {
+            if (flowFile == null) return;
+            final Map<String,String> attributes = new HashMap<String, 
String>();
+            final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
+            final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            final KuduSession kuduSession = this.getKuduSession(kuduClient);
+
+            session.read(flowFile, (final InputStream rawIn) -> {
+                RecordReader recordReader = null;
+                try (final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
+                    try {
+                        recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
+                    } catch (Exception ex) {
+                        final RecordReaderFactoryException rrfe = new 
RecordReaderFactoryException("Unable to create RecordReader", ex);
+                        exceptionHolder.set(rrfe);
+                        return;
+                    }
+
+                    List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
+                    final RecordSet recordSet = recordReader.createRecordSet();
+
+                    if (skipHeadLine) recordSet.next();
+
+                    int numOfAddedRecord = 0;
+                    Record record = recordSet.next();
+                    while (record != null) {
+                        org.apache.kudu.client.Operation oper = null;
+                        if(operationType == OperationType.UPSERT) {
+                            oper = upsertRecordToKudu(kuduTable, record, 
fieldNames);
+                        } else {
+                            oper = insertRecordToKudu(kuduTable, record, 
fieldNames);
+                        }
+                        kuduSession.apply(oper);
+                        numOfAddedRecord++;
+                        record = recordSet.next();
+                    }
+
+                    getLogger().info("KUDU: number of inserted records: " + 
numOfAddedRecord);
+                    attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(numOfAddedRecord));
+
+                } catch (KuduException ex) {
+                    getLogger().error("Exception occurred while interacting 
with Kudu due to " + ex.getMessage(), ex);
+                    exceptionHolder.set(ex);
+                } catch (Exception e) {
+                    exceptionHolder.set(e);
+                } finally {
+                    IOUtils.closeQuietly(recordReader);
+                }
+            });
+            kuduSession.close();
+            if (exceptionHolder.get() != null) {
+                throw exceptionHolder.get();
+            }
+
+            // Update flow file's attributes after the ingestion
+            session.putAllAttributes(flowFile, attributes);
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, "Successfully added 
flowfile to kudu");
+
+        } catch (IOException | FlowFileAccessException e) {
+            getLogger().error("Failed to write due to {}", new Object[]{e});
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (Throwable t) {
+            getLogger().error("Failed to write due to {}", new Object[]{t});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    protected KuduClient getKuduConnection(String masters) {
+        return new KuduClient.KuduClientBuilder(kuduMasters).build();
+    }
+
+    protected KuduTable getKuduTable(KuduClient client, String tableName) 
throws KuduException {
+        return client.openTable(tableName);
+    }
+
+    protected KuduSession getKuduSession(KuduClient client){
+
+        KuduSession kuduSession = client.newSession();
+        if(operationType == OperationType.INSERT_IGNORE){
+            kuduSession.setIgnoreAllDuplicateRows(true);
+        }
+
+        return kuduSession;
+    }
+
+    protected abstract Insert insertRecordToKudu(final KuduTable table, final 
Record record, final List<String> fields) throws Exception;
+    protected abstract Upsert upsertRecordToKudu(final KuduTable table, final 
Record record, final List<String> fields) throws Exception;
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
new file mode 100644
index 0000000..4ab466e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+public enum OperationType {
+    INSERT,
+    INSERT_IGNORE,
+    UPSERT;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
new file mode 100644
index 0000000..53fc678
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+        "to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+        " If any error occurs while reading records from the input, or writing 
records to Kudu, the FlowFile will be routed to failure")
+@WritesAttribute(attribute = "record.count", description = "Number of records 
written to Kudu")
+public class PutKudu extends AbstractKudu {
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(KUDU_MASTERS);
+        properties.add(TABLE_NAME);
+        properties.add(SKIP_HEAD_LINE);
+        properties.add(RECORD_READER);
+        properties.add(INSERT_OPERATION);
+
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+    @Override
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) throws IllegalStateException, Exception {
+        Upsert upsert = kuduTable.newUpsert();
+        this.insert(kuduTable, upsert, record, fieldNames);
+        return upsert;
+    }
+
+    @Override
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) throws IllegalStateException, Exception {
+        Insert insert = kuduTable.newInsert();
+        this.insert(kuduTable, insert, record, fieldNames);
+        return insert;
+    }
+
+    private void insert(KuduTable kuduTable, Operation operation, Record 
record, List<String> fieldNames){
+        PartialRow row = operation.getRow();
+        Schema colSchema = kuduTable.getSchema();
+
+        for (String colName : fieldNames) {
+            int colIdx = this.getColumnIndex(colSchema, colName);
+            if (colIdx != -1) {
+                Type colType = colSchema.getColumnByIndex(colIdx).getType();
+
+                switch (colType.getDataType()) {
+                    case BOOL:
+                        row.addBoolean(colIdx, record.getAsBoolean(colName));
+                        break;
+                    case FLOAT:
+                        row.addFloat(colIdx, record.getAsFloat(colName));
+                        break;
+                    case DOUBLE:
+                        row.addDouble(colIdx, record.getAsDouble(colName));
+                        break;
+                    case BINARY:
+                        row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
+                        break;
+                    case INT8:
+                    case INT16:
+                        short temp = 
(short)record.getAsInt(colName).intValue();
+                        row.addShort(colIdx, temp);
+                    case INT32:
+                        row.addInt(colIdx, record.getAsInt(colName));
+                        break;
+                    case INT64:
+                        row.addLong(colIdx, record.getAsLong(colName));
+                        break;
+                    case STRING:
+                        row.addString(colIdx, record.getAsString(colName));
+                        break;
+                    default:
+                        throw new IllegalStateException(String.format("unknown 
column type %s", colType));
+                }
+            }
+        }
+    }
+
+    private int getColumnIndex(Schema columns, String colName) {
+        try {
+            return columns.getColumnIndex(colName);
+        } catch (Exception ex) {
+            return -1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..908723c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kudu.PutKudu
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
new file mode 100644
index 0000000..feef584
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class MockPutKudu extends PutKudu{
+  @Override
+  protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    return mock(Insert.class);
+  }
+
+  @Override
+  protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    return mock(Upsert.class);
+  }
+
+  @Override
+  protected KuduClient getKuduConnection(String masters) {
+    return mock(KuduClient.class);
+  }
+
+  @Override
+  protected KuduSession getKuduSession(KuduClient client){
+    return mock(KuduSession.class);
+  }
+
+  @Override
+  protected KuduTable getKuduTable(KuduClient client, String tableName) throws 
KuduException {
+    return mock(KuduTable.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
new file mode 100644
index 0000000..fa3aa77
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class TestPutKudu {
+
+    public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
+    public static final String DEFAULT_MASTERS = "testLocalHost:7051";
+    public static final String SKIP_HEAD_LINE = "false";
+    public static final String TABLE_SCHEMA = 
"id,stringVal,num32Val,doubleVal";
+
+    private TestRunner testRunner;
+    private MockPutKudu processor;
+    private MockRecordParser readerFactory;
+
+    @Before
+    public void setUp() {
+        processor = new MockPutKudu();
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
+        testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
+        testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
+        testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
OperationType.INSERT.toString());
+    }
+
+    @After
+    public void close() {
+        testRunner = null;
+    }
+
+    private void createRecordReader(int numOfRecord) throws 
InitializationException {
+
+        readerFactory = new MockRecordParser();
+        readerFactory.addSchemaField("id", RecordFieldType.INT);
+        readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
+        readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
+        readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+
+        for (int i=0; i < numOfRecord; i++) {
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+        }
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+    }
+
+    @Test
+    public void testWriteKuduWithDefaults() throws IOException, 
InitializationException {
+        createRecordReader(100);
+
+        final String filename = "testWriteKudu-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+
+        // verify the successful flow file has the expected content & 
attributes
+        final MockFlowFile mockFlowFile = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+        mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
+        mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
+        mockFlowFile.assertContentEquals("trigger");
+
+        // verify we generated a provenance event
+        final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
+        Assert.assertEquals(1, provEvents.size());
+
+        // verify it was a SEND event with the correct URI
+        final ProvenanceEventRecord provEvent = provEvents.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, 
provEvent.getEventType());
+    }
+
+    @Test
+    public void testInvalidReaderShouldRouteToFailure() throws 
InitializationException, SchemaNotFoundException, MalformedRecordException, 
IOException {
+        createRecordReader(0);
+
+        // simulate throwing an IOException when the factory creates a reader 
which is what would happen when
+        // invalid Avro is passed to the Avro reader factory
+        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenThrow(new 
IOException("NOT AVRO"));
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
+
+        final String filename = "testInvalidAvroShouldRouteToFailure-" + 
System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testValidSchemaShouldBeSuccessful() throws 
InitializationException, IOException {
+        createRecordReader(10);
+        final String filename = "testValidSchemaShouldBeSuccessful-" + 
System.currentTimeMillis();
+
+        // don't provide my.schema as an attribute
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("my.schema", TABLE_SCHEMA);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() 
throws InitializationException, IOException, MalformedRecordException, 
SchemaNotFoundException {
+        createRecordReader(10);
+
+        final RecordReader recordReader = Mockito.mock(RecordReader.class);
+        when(recordReader.nextRecord()).thenThrow(new 
MalformedRecordException("ERROR"));
+
+        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
+
+        final String filename = 
"testMalformedRecordExceptionShouldRouteToFailure-" + 
System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testReadAsStringAndWriteAsInt() throws 
InitializationException, IOException {
+        createRecordReader(0);
+        // add the favorite color as a string
+        readerFactory.addRecord(1, "name0", "0", "89.89");
+
+        final String filename = "testReadAsStringAndWriteAsInt-" + 
System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testMissingColumInReader() throws InitializationException, 
IOException {
+        createRecordReader(0);
+        readerFactory.addRecord( "name0", "0", "89.89"); //missing id
+
+        final String filename = "testMissingColumInReader-" + 
System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testSkipHeadLineTrue() throws InitializationException, 
IOException {
+        createRecordReader(100);
+        testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "true");
+
+        final String filename = "testSkipHeadLineTrue-" + 
System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+
+        MockFlowFile flowFiles = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+        flowFiles.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "99");
+    }
+
+    @Test
+    public void testInsertManyFlowFiles() throws Exception {
+        createRecordReader(50);
+        final String content1 = "{ \"field1\" : \"value1\", \"field2\" : 
\"valu11\" }";
+        final String content2 = "{ \"field1\" : \"value1\", \"field2\" : 
\"value11\" }";
+        final String content3 = "{ \"field1\" : \"value3\", \"field2\" : 
\"value33\" }";
+
+        testRunner.enqueue(content1.getBytes());
+        testRunner.enqueue(content2.getBytes());
+        testRunner.enqueue(content3.getBytes());
+
+        testRunner.run(3);
+
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS);
+
+        flowFiles.get(0).assertContentEquals(content1.getBytes());
+        flowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, 
"50");
+
+        flowFiles.get(1).assertContentEquals(content2.getBytes());
+        flowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, 
"50");
+
+        flowFiles.get(2).assertContentEquals(content3.getBytes());
+        flowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, 
"50");
+    }
+
+    @Test
+    public void testUpsertFlowFiles() throws Exception {
+        createRecordReader(50);
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
OperationType.UPSERT.toString());
+        testRunner.enqueue("string".getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+
+        flowFile.assertContentEquals("string".getBytes());
+        flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
new file mode 100644
index 0000000..5c97172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-kudu-bundle</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-kudu-processors</module>
+        <module>nifi-kudu-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 6aef817..456291f 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -34,6 +34,7 @@
         <module>nifi-update-attribute-bundle</module>
         <module>nifi-kafka-bundle</module>
         <module>nifi-kite-bundle</module>
+        <module>nifi-kudu-bundle</module>
         <module>nifi-solr-bundle</module>
         <module>nifi-confluent-platform-bundle</module>
         <module>nifi-aws-bundle</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4eab25c..b719f6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1103,6 +1103,18 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-kudu-nar</artifactId>
+                <version>1.4.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-kudu-nar</artifactId>
+                <version>1.4.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-mongodb-nar</artifactId>
                 <version>1.4.0-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to