This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 26b2036  NIFI-6552 - Kudu Put Operations
26b2036 is described below

commit 26b203616e79032bdcd47ac927d488a2ce68c656
Author: SandishKumarHN <[email protected]>
AuthorDate: Sun Jul 28 18:15:07 2019 -0700

    NIFI-6552 - Kudu Put Operations
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3610.
---
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |   2 +-
 .../processors/kudu/AbstractKuduProcessor.java     | 263 +++++++++++++++++++++
 .../apache/nifi/processors/kudu/OperationType.java |   4 +-
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 219 ++++-------------
 .../apache/nifi/processors/kudu/MockPutKudu.java   |  41 +++-
 .../apache/nifi/processors/kudu/TestPutKudu.java   |  69 ++++--
 6 files changed, 401 insertions(+), 197 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 92664ff..e54c531 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -46,7 +46,7 @@
         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-client</artifactId>
-            <version>1.7.0</version>
+            <version>1.10.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
new file mode 100644
index 0000000..c1b1a81
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.Update;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.record.Record;
+
+import javax.security.auth.login.LoginException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+
+public abstract class AbstractKuduProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor KUDU_MASTERS = new Builder()
+            .name("Kudu Masters")
+            .description("Comma separated addresses of the Kudu masters to 
connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
Builder()
+            .name("kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials to use for 
authentication")
+            .required(false)
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .build();
+
+    static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
+            .name("kudu-operations-timeout-ms")
+            .displayName("Kudu Operation Timeout")
+            .description("Default timeout used for user operations (using 
sessions and scanners)")
+            .required(false)
+            
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + 
"ms")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS = new 
Builder()
+            .name("kudu-keep-alive-period-timeout-ms")
+            .displayName("Kudu Keep Alive Period Timeout")
+            .description("Default timeout used for user operations")
+            .required(false)
+            
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) + 
"ms")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected KuduClient kuduClient;
+
+    private volatile KerberosUser kerberosUser;
+
+    public KerberosUser getKerberosUser() {
+        return this.kerberosUser;
+    }
+
+    public KuduClient getKuduClient() {
+        return this.kuduClient;
+    }
+
+    public void createKuduClient(ProcessContext context) throws LoginException 
{
+        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        if (credentialsService == null) {
+            return;
+        }
+
+        final String keytab = credentialsService.getKeytab();
+        final String principal = credentialsService.getPrincipal();
+        kerberosUser = loginKerberosUser(principal, keytab);
+
+        final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), 
getLogger());
+        this.kuduClient = kerberosAction.execute();
+    }
+
+
+    protected KuduClient buildClient(final String masters, final 
ProcessContext context) {
+        final Integer operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final Integer adminOperationTimeout = 
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+        return new KuduClient.KuduClientBuilder(masters)
+                .defaultOperationTimeoutMs(operationTimeout)
+                .defaultSocketReadTimeoutMs(adminOperationTimeout)
+                .build();
+    }
+
+    protected void flushKuduSession(final KuduSession kuduSession, boolean 
close, final List<RowError> rowErrors) throws KuduException {
+        final List<OperationResponse> responses = close ? kuduSession.close() 
: kuduSession.flush();
+
+        if (kuduSession.getFlushMode() == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+            
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
+        } else {
+            responses.stream()
+                    .filter(OperationResponse::hasRowError)
+                    .map(OperationResponse::getRowError)
+                    .forEach(rowErrors::add);
+        }
+    }
+
+    protected KerberosUser loginKerberosUser(final String principal, final 
String keytab) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
+        kerberosUser.login();
+        return kerberosUser;
+    }
+
+    @OnStopped
+    public void shutdown() throws Exception {
+        try {
+            if (this.kuduClient != null) {
+                getLogger().debug("Closing KuduClient");
+                this.kuduClient.close();
+                this.kuduClient  = null;
+            }
+        } finally {
+            if (kerberosUser != null) {
+                kerberosUser.logout();
+                kerberosUser = null;
+            }
+        }
+    }
+
+    @VisibleForTesting
+    protected void buildPartialRow(Schema schema, PartialRow row, Record 
record, List<String> fieldNames, Boolean ignoreNull) {
+
+        for (String colName : fieldNames) {
+            int colIdx = this.getColumnIndex(schema, colName);
+            if (colIdx != -1) {
+                ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
+                Type colType = colSchema.getType();
+
+                if (record.getValue(colName) == null) {
+                    if (schema.getColumnByIndex(colIdx).isKey()) {
+                        throw new 
IllegalArgumentException(String.format("Can't set primary key column %s to null 
", colName));
+                    } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
+                        throw new 
IllegalArgumentException(String.format("Can't set primary key column %s to null 
", colName));
+                    }
+
+                    if (!ignoreNull) {
+                        row.setNull(colName);
+                        continue;
+                    }
+                } else {
+                    switch 
(colType.getDataType(colSchema.getTypeAttributes())) {
+                        case BOOL:
+                            row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+                            break;
+                        case FLOAT:
+                            row.addFloat(colIdx, record.getAsFloat(colName));
+                            break;
+                        case DOUBLE:
+                            row.addDouble(colIdx, record.getAsDouble(colName));
+                            break;
+                        case BINARY:
+                            row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
+                            break;
+                        case INT8:
+                            row.addByte(colIdx, 
record.getAsInt(colName).byteValue());
+                            break;
+                        case INT16:
+                            row.addShort(colIdx, 
record.getAsInt(colName).shortValue());
+                            break;
+                        case INT32:
+                            row.addInt(colIdx, record.getAsInt(colName));
+                            break;
+                        case INT64:
+                        case UNIXTIME_MICROS:
+                            row.addLong(colIdx, record.getAsLong(colName));
+                            break;
+                        case STRING:
+                            row.addString(colIdx, record.getAsString(colName));
+                            break;
+                        case DECIMAL32:
+                        case DECIMAL64:
+                        case DECIMAL128:
+                            row.addDecimal(colIdx, new 
BigDecimal(record.getAsString(colName)));
+                            break;
+                        default:
+                            throw new 
IllegalStateException(String.format("unknown column type %s", colType));
+                    }
+                }
+            }
+        }
+    }
+
+    private int getColumnIndex(Schema columns, String colName) {
+        try {
+            return columns.getColumnIndex(colName);
+        } catch (Exception ex) {
+            return -1;
+        }
+    }
+
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        Upsert upsert = kuduTable.newUpsert();
+        buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, 
fieldNames, ignoreNull);
+        return upsert;
+    }
+
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        Insert insert = kuduTable.newInsert();
+        buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, 
fieldNames, ignoreNull);
+        return insert;
+    }
+
+    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        Delete delete = kuduTable.newDelete();
+        buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, 
fieldNames, ignoreNull);
+        return delete;
+    }
+
+    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        Update update = kuduTable.newUpdate();
+        buildPartialRow(kuduTable.getSchema(), update.getRow(), record, 
fieldNames, ignoreNull);
+        return update;
+    }
+
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
index 4ab466e..08bcd77 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
@@ -20,5 +20,7 @@ package org.apache.nifi.processors.kudu;
 public enum OperationType {
     INSERT,
     INSERT_IGNORE,
-    UPSERT;
+    UPSERT,
+    UPDATE,
+    DELETE;
 }
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 900d3d3..1b47337 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,20 +17,13 @@
 
 package org.apache.nifi.processors.kudu;
 
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.SessionConfiguration;
-import org.apache.kudu.client.Upsert;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
@@ -39,20 +32,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -62,10 +51,8 @@ import org.apache.nifi.serialization.record.RecordSet;
 import javax.security.auth.login.LoginException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.math.BigDecimal;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -82,14 +69,8 @@ import java.util.stream.Collectors;
         "to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
         " If any error occurs while reading records from the input, or writing 
records to Kudu, the FlowFile will be routed to failure")
 @WritesAttribute(attribute = "record.count", description = "Number of records 
written to Kudu")
-public class PutKudu extends AbstractProcessor {
-    protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
-        .name("Kudu Masters")
-        .description("List all kudu masters's ip with port (e.g. 7051), comma 
separated")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-        .build();
+
+public class PutKudu extends AbstractKuduProcessor {
 
     protected static final PropertyDescriptor TABLE_NAME = new Builder()
         .name("Table Name")
@@ -99,20 +80,13 @@ public class PutKudu extends AbstractProcessor {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
Builder()
-        .name("kerberos-credentials-service")
-        .displayName("Kerberos Credentials Service")
-        .description("Specifies the Kerberos Credentials to use for 
authentication")
-        .required(false)
-        .identifiesControllerService(KerberosCredentialsService.class)
-        .build();
-
     public static final PropertyDescriptor RECORD_READER = new Builder()
         .name("record-reader")
         .displayName("Record Reader")
         .description("The service for reading records from incoming flow 
files.")
         .identifiesControllerService(RecordReaderFactory.class)
         .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
     protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
@@ -127,9 +101,10 @@ public class PutKudu extends AbstractProcessor {
 
     protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
         .name("Insert Operation")
+        .displayName("Kudu Operation Type")
         .description("Specify operationType for this processor. Insert-Ignore 
will ignore duplicated rows")
-        .allowableValues(OperationType.values())
         .defaultValue(OperationType.INSERT.toString())
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
@@ -142,10 +117,11 @@ public class PutKudu extends AbstractProcessor {
             "MANUAL_FLUSH: the call returns when the operation has been added 
to the buffer, else it throws a KuduException if the buffer is full.")
         .allowableValues(SessionConfiguration.FlushMode.values())
         
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .required(true)
         .build();
 
-    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new 
Builder()
         .name("FlowFiles per Batch")
         .description("The maximum number of FlowFiles to process in a single 
execution, between 1 - 100000. " +
             "Depending on your memory size, and data size per row set an 
appropriate batch size " +
@@ -157,7 +133,7 @@ public class PutKudu extends AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
-    protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+    protected static final PropertyDescriptor BATCH_SIZE = new Builder()
         .name("Batch Size")
         .displayName("Max Records per Batch")
         .description("The maximum number of Records to process in a single 
Kudu-client batch, between 1 - 100000. " +
@@ -169,6 +145,15 @@ public class PutKudu extends AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
+    protected static final PropertyDescriptor IGNORE_NULL = new Builder()
+        .name("Ignore NULL")
+        .description("Ignore NULL on Kudu Put Operation, Update only non-Null 
columns if set true")
+        .defaultValue("false")
+        .allowableValues("true", "false")
+        .required(true)
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
 
     protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
         .name("success")
@@ -183,12 +168,10 @@ public class PutKudu extends AbstractProcessor {
 
     protected OperationType operationType;
     protected SessionConfiguration.FlushMode flushMode;
+
     protected int batchSize = 100;
     protected int ffbatch   = 1;
 
-    protected KuduClient kuduClient;
-    private volatile KerberosUser kerberosUser;
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -201,7 +184,9 @@ public class PutKudu extends AbstractProcessor {
         properties.add(FLUSH_MODE);
         properties.add(FLOWFILE_BATCH_SIZE);
         properties.add(BATCH_SIZE);
-
+        properties.add(IGNORE_NULL);
+        properties.add(KUDU_OPERATION_TIMEOUT_MS);
+        properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
         return properties;
     }
 
@@ -213,58 +198,15 @@ public class PutKudu extends AbstractProcessor {
         return rels;
     }
 
+    protected KerberosUser kerberosUser;
+    protected KuduSession kuduSession;
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException, 
LoginException {
-        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
-        operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
         batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         ffbatch   = 
context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         flushMode = 
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
-
-        getLogger().debug("Setting up Kudu connection...");
-        final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        kuduClient = createClient(kuduMasters, credentialsService);
-        getLogger().debug("Kudu connection successfully initialized");
-    }
-
-    protected KuduClient createClient(final String masters, final 
KerberosCredentialsService credentialsService) throws LoginException {
-        if (credentialsService == null) {
-            return buildClient(masters);
-        }
-
-        final String keytab = credentialsService.getKeytab();
-        final String principal = credentialsService.getPrincipal();
-        kerberosUser = loginKerberosUser(principal, keytab);
-
-        final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
-        return kerberosAction.execute();
-    }
-
-    protected KuduClient buildClient(final String masters) {
-        return new KuduClient.KuduClientBuilder(masters).build();
-    }
-
-    protected KerberosUser loginKerberosUser(final String principal, final 
String keytab) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
-        kerberosUser.login();
-        return kerberosUser;
-    }
-
-    @OnStopped
-    public final void closeClient() throws KuduException, LoginException {
-        try {
-            if (kuduClient != null) {
-                getLogger().debug("Closing KuduClient");
-                kuduClient.close();
-                kuduClient = null;
-            }
-        } finally {
-            if (kerberosUser != null) {
-                kerberosUser.logout();
-                kerberosUser = null;
-            }
-        }
+        createKuduClient(context);
     }
 
     @Override
@@ -273,6 +215,7 @@ public class PutKudu extends AbstractProcessor {
         if (flowFiles.isEmpty()) {
             return;
         }
+        kerberosUser = getKerberosUser();
 
         final KerberosUser user = kerberosUser;
         if (user == null) {
@@ -290,9 +233,11 @@ public class PutKudu extends AbstractProcessor {
     }
 
     private void trigger(final ProcessContext context, final ProcessSession 
session, final List<FlowFile> flowFiles) throws ProcessException {
-        final KuduSession kuduSession = getKuduSession(kuduClient);
         final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
+        final KuduClient kuduClient = getKuduClient();
+        kuduSession = getKuduSession(kuduClient);
+
         final Map<FlowFile, Integer> numRecords = new HashMap<>();
         final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
         final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
@@ -300,6 +245,8 @@ public class PutKudu extends AbstractProcessor {
         int numBuffered = 0;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
+            operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
+            Boolean ignoreNull = 
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
                 final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
@@ -309,10 +256,7 @@ public class PutKudu extends AbstractProcessor {
 
                 Record record = recordSet.next();
                 while (record != null) {
-                    Operation operation = operationType == OperationType.UPSERT
-                        ? upsertRecordToKudu(kuduTable, record, fieldNames)
-                        : insertRecordToKudu(kuduTable, record, fieldNames);
-
+                    Operation operation = getKuduOperationType(operationType, 
record, fieldNames, ignoreNull, kuduTable);
                     // We keep track of mappings between Operations and their 
origins,
                     // so that we know which FlowFiles should be marked 
failure after buffered flush.
                     operationFlowFileMap.put(operation, flowFile);
@@ -399,91 +343,20 @@ public class PutKudu extends AbstractProcessor {
         return kuduSession;
     }
 
-    private void flushKuduSession(final KuduSession kuduSession, boolean 
close, final List<RowError> rowErrors) throws KuduException {
-        final List<OperationResponse> responses = close ? kuduSession.close() 
: kuduSession.flush();
-
-        if (kuduSession.getFlushMode() == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
-            
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
-        } else {
-            responses.stream()
-                .filter(OperationResponse::hasRowError)
-                .map(OperationResponse::getRowError)
-                .forEach(rowErrors::add);
-        }
-    }
-
-
-
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
-        Upsert upsert = kuduTable.newUpsert();
-        this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, 
fieldNames);
-        return upsert;
-    }
-
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
-        Insert insert = kuduTable.newInsert();
-        this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, 
fieldNames);
-        return insert;
-    }
-
-    void buildPartialRow(Schema schema, PartialRow row, Record record, 
List<String> fieldNames) {
-        for (String colName : fieldNames) {
-            int colIdx = this.getColumnIndex(schema, colName);
-            if (colIdx != -1) {
-                ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
-                Type colType = colSchema.getType();
-
-                if (record.getValue(colName) == null) {
-                    row.setNull(colName);
-                    continue;
-                }
-
-                switch (colType.getDataType(colSchema.getTypeAttributes())) {
-                    case BOOL:
-                        row.addBoolean(colIdx, record.getAsBoolean(colName));
-                        break;
-                    case FLOAT:
-                        row.addFloat(colIdx, record.getAsFloat(colName));
-                        break;
-                    case DOUBLE:
-                        row.addDouble(colIdx, record.getAsDouble(colName));
-                        break;
-                    case BINARY:
-                        row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
-                        break;
-                    case INT8:
-                        row.addByte(colIdx, 
record.getAsInt(colName).byteValue());
-                        break;
-                    case INT16:
-                        row.addShort(colIdx, 
record.getAsInt(colName).shortValue());
-                        break;
-                    case INT32:
-                        row.addInt(colIdx, record.getAsInt(colName));
-                        break;
-                    case INT64:
-                    case UNIXTIME_MICROS:
-                        row.addLong(colIdx, record.getAsLong(colName));
-                        break;
-                    case STRING:
-                        row.addString(colIdx, record.getAsString(colName));
-                        break;
-                    case DECIMAL32:
-                    case DECIMAL64:
-                    case DECIMAL128:
-                        row.addDecimal(colIdx, new 
BigDecimal(record.getAsString(colName)));
-                        break;
-                    default:
-                        throw new IllegalStateException(String.format("unknown 
column type %s", colType));
-                }
-            }
-        }
-    }
-
-    private int getColumnIndex(Schema columns, String colName) {
-        try {
-            return columns.getColumnIndex(colName);
-        } catch (Exception ex) {
-            return -1;
+    private Operation getKuduOperationType(OperationType operationType, Record 
record, List<String> fieldNames, Boolean ignoreNull, KuduTable kuduTable) {
+        switch (operationType) {
+            case DELETE:
+                return deleteRecordFromKudu(kuduTable, record, fieldNames, 
ignoreNull);
+            case INSERT:
+                return insertRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull);
+            case INSERT_IGNORE:
+                return insertRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull);
+            case UPSERT:
+                return upsertRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull);
+            case UPDATE:
+                return updateRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull);
+            default:
+                throw new 
IllegalArgumentException(String.format("OperationType: %s not supported by 
Kudu", operationType));
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 32a943a..b5ce71f 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,11 +17,14 @@
 
 package org.apache.nifi.processors.kudu;
 
-import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.Update;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.Record;
 
@@ -30,14 +33,15 @@ import java.security.PrivilegedAction;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.LinkedList;
 
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MockPutKudu extends PutKudu {
+
     private KuduSession session;
     private LinkedList<Insert> insertQueue;
 
@@ -58,18 +62,41 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
         Insert insert = insertQueue.poll();
         return insert != null ? insert : mock(Insert.class);
     }
 
     @Override
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
         return mock(Upsert.class);
     }
 
     @Override
-    protected KuduClient buildClient(final String masters) {
+    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        return mock(Delete.class);
+    }
+
+    @Override
+    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames, Boolean ignoreNull) {
+        return mock(Update.class);
+    }
+
+    @Override
+    public KuduClient buildClient(final String masters, ProcessContext 
context) {
+        final KuduClient client = mock(KuduClient.class);
+
+        try {
+            
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+        } catch (final Exception e) {
+            throw new AssertionError(e);
+        }
+
+        return client;
+    }
+
+    @Override
+    public KuduClient getKuduClient() {
         final KuduClient client = mock(KuduClient.class);
 
         try {
@@ -138,4 +165,4 @@ public class MockPutKudu extends PutKudu {
     protected KuduSession getKuduSession(KuduClient client) {
         return session;
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 322f040..ac7c0cd 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.processors.kudu;
 
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
@@ -74,7 +74,7 @@ import static 
org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -86,20 +86,23 @@ public class TestPutKudu {
     public static final String TABLE_SCHEMA = 
"id,stringVal,num32Val,doubleVal";
 
     private TestRunner testRunner;
+
     private MockPutKudu processor;
+
     private MockRecordParser readerFactory;
 
     @Before
-    public void setUp() {
+    public void setUp() throws InitializationException {
         processor = new MockPutKudu();
         testRunner = TestRunners.newTestRunner(processor);
         setUpTestRunner(testRunner);
     }
 
-    private void setUpTestRunner(TestRunner testRunner) {
+    private void setUpTestRunner(TestRunner testRunner) throws 
InitializationException {
         testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
         testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
         testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
+        testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
         testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
         testRunner.setProperty(PutKudu.INSERT_OPERATION, 
OperationType.INSERT.toString());
     }
@@ -173,7 +176,6 @@ public class TestPutKudu {
         assertTrue(proc.loggedOut());
     }
 
-
     @Test
     public void testInsecureClient() throws InitializationException {
         createRecordReader(1);
@@ -327,6 +329,42 @@ public class TestPutKudu {
     }
 
     @Test
+    public void testDeleteFlowFiles() throws Exception {
+        createRecordReader(50);
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
"${kudu.record.delete}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("kudu.record.delete", "DELETE");
+
+        testRunner.enqueue("string".getBytes(), attributes);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+
+        flowFile.assertContentEquals("string".getBytes());
+        flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
+    }
+
+    @Test
+    public void testUpdateFlowFiles() throws Exception {
+        createRecordReader(50);
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
"${kudu.record.update}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("kudu.record.update", "UPDATE");
+
+        testRunner.enqueue("string".getBytes(), attributes);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+
+        flowFile.assertContentEquals("string".getBytes());
+        flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
+    }
+
+    @Test
     public void testBuildRow() {
         buildPartialRow((long) 1, "foo", (short) 10);
     }
@@ -348,11 +386,11 @@ public class TestPutKudu {
 
     private void buildPartialRow(Long id, String name, Short age) {
         final Schema kuduSchema = new Schema(Arrays.asList(
-            new ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
-            new ColumnSchemaBuilder("name", 
Type.STRING).nullable(true).build(),
-            new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
-            new ColumnSchemaBuilder("updated_at", 
Type.UNIXTIME_MICROS).nullable(false).build(),
-            new ColumnSchemaBuilder("score", 
Type.DECIMAL).nullable(true).typeAttributes(
+            new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT64).key(true).build(),
+            new ColumnSchema.ColumnSchemaBuilder("name", 
Type.STRING).nullable(true).build(),
+            new ColumnSchema.ColumnSchemaBuilder("age", 
Type.INT16).nullable(false).build(),
+            new ColumnSchema.ColumnSchemaBuilder("updated_at", 
Type.UNIXTIME_MICROS).nullable(false).build(),
+            new ColumnSchema.ColumnSchemaBuilder("score", 
Type.DECIMAL).nullable(true).typeAttributes(
                 new 
ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
             ).build()));
 
@@ -369,11 +407,12 @@ public class TestPutKudu {
         values.put("age", age);
         values.put("updated_at", System.currentTimeMillis() * 1000);
         values.put("score", 10000L);
-        new PutKudu().buildPartialRow(
+        processor.buildPartialRow(
             kuduSchema,
             kuduSchema.newPartialRow(),
             new MapRecord(schema, values),
-            schema.getFieldNames()
+            schema.getFieldNames(),
+                true
         );
     }
 
@@ -393,12 +432,12 @@ public class TestPutKudu {
         EXCEPTION
     }
 
-    private LinkedList<OperationResponse> queueInsert(MockPutKudu kudu, 
KuduSession session, boolean sync, ResultCode... results) throws Exception {
+    private LinkedList<OperationResponse> queueInsert(MockPutKudu putKudu, 
KuduSession session, boolean sync, ResultCode... results) throws Exception {
         LinkedList<OperationResponse> responses = new LinkedList<>();
         for (ResultCode result : results) {
             boolean ok = result == OK;
             Tuple<Insert, OperationResponse> tuple = insert(ok);
-            kudu.queue(tuple.getKey());
+            putKudu.queue(tuple.getKey());
 
             if (result == EXCEPTION) {
                 
when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class));
@@ -526,6 +565,7 @@ public class TestPutKudu {
         testRunner.run(numFlowFiles);
 
         testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3);
+
         List<MockFlowFile> failedFlowFiles = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE);
         
failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2");
         
failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ? 
"1" : "2");
@@ -557,7 +597,6 @@ public class TestPutKudu {
         testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
     }
 
-
     public static class MockKerberosCredentialsService extends 
AbstractControllerService implements KerberosCredentialsService {
         private final String keytab;
         private final String principal;

Reply via email to