Repository: nifi
Updated Branches:
  refs/heads/master 0a44bad76 -> 02ba4cf2c


NIFI-5064 - Fixes and improvements to PutKudu processor

Signed-off-by: Pierre Villard <[email protected]>

This closes #2621.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/02ba4cf2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/02ba4cf2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/02ba4cf2

Branch: refs/heads/master
Commit: 02ba4cf2c85221af731f310b9f2d1624f4e2f446
Parents: 0a44bad
Author: Junegunn Choi <[email protected]>
Authored: Tue Apr 3 16:36:54 2018 +0900
Committer: Pierre Villard <[email protected]>
Committed: Wed May 9 19:39:29 2018 +0200

----------------------------------------------------------------------
 .../nifi-kudu-processors/pom.xml                |   7 +-
 .../nifi/processors/kudu/AbstractKudu.java      | 188 +++++++------
 .../apache/nifi/processors/kudu/PutKudu.java    |  38 ++-
 .../nifi/processors/kudu/MockPutKudu.java       |  25 +-
 .../nifi/processors/kudu/TestPutKudu.java       | 265 ++++++++++++++++++-
 5 files changed, 426 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 0eba8d9..250a2d2 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -47,7 +47,7 @@
         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-client</artifactId>
-            <version>1.3.0</version>
+            <version>1.7.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -91,5 +91,10 @@
             <version>2.5.4</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
index 68e7004..8e78eb4 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
@@ -17,16 +17,15 @@
 
 package org.apache.nifi.processors.kudu;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.kudu.client.Upsert;
 import org.apache.kudu.client.SessionConfiguration;
 
@@ -40,20 +39,20 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
 
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.serialization.record.Record;
 
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public abstract class AbstractKudu extends AbstractProcessor {
 
@@ -83,9 +82,10 @@ public abstract class AbstractKudu extends AbstractProcessor 
{
 
     protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
             .name("Skip head line")
-            .description("Set it to true if your first line is the header line 
e.g. column names")
+            .description("Deprecated. Used to ignore header lines, but this 
should be handled by a RecordReader " +
+                    "(e.g. \"Treat First Line as Header\" property of 
CSVReader)")
             .allowableValues("true", "false")
-            .defaultValue("true")
+            .defaultValue("false")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -112,12 +112,12 @@ public abstract class AbstractKudu extends 
AbstractProcessor {
 
     protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Batch Size")
-            .description("Set the number of operations that can be buffered, 
between 2 - 100000. " +
+            .description("The maximum number of FlowFiles to process in a 
single execution, between 1 - 100000. " +
                     "Depending on your memory size, and data size per row set 
an appropriate batch size. " +
                     "Gradually increase this number to find out the best one 
for best performances.")
             .defaultValue("100")
             .required(true)
-            .addValidator(StandardValidators.createLongValidator(2, 100000, 
true))
+            .addValidator(StandardValidators.createLongValidator(1, 100000, 
true))
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
@@ -147,17 +147,17 @@ public abstract class AbstractKudu extends 
AbstractProcessor {
         try {
             tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
             kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
-            if(kuduClient == null) {
+            operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
+            batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+            flushMode = 
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
+            skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
+
+            if (kuduClient == null) {
                 getLogger().debug("Setting up Kudu connection...");
                 kuduClient = getKuduConnection(kuduMasters);
                 kuduTable = this.getKuduTable(kuduClient, tableName);
                 getLogger().debug("Kudu connection successfully initialized");
             }
-
-            operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
-            batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-            flushMode = 
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
-            skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
         } catch(KuduException ex){
             getLogger().error("Exception occurred while interacting with Kudu 
due to " + ex.getMessage(), ex);
         }
@@ -172,75 +172,107 @@ public abstract class AbstractKudu extends 
AbstractProcessor {
         }
     }
 
+    private Stream<RowError> flushKuduSession(final KuduSession kuduSession, 
boolean close) throws Exception {
+        List<OperationResponse> responses = close ? kuduSession.close() : 
kuduSession.flush();
+        Stream<RowError> rowErrors;
+        if (kuduSession.getFlushMode() == FlushMode.AUTO_FLUSH_BACKGROUND) {
+            rowErrors = 
Stream.of(kuduSession.getPendingErrors().getRowErrors());
+        } else {
+            rowErrors = responses.stream()
+                .filter(OperationResponse::hasRowError)
+                .map(OperationResponse::getRowError);
+        }
+        return rowErrors;
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final FlowFile flowFile = session.get();
-        try {
-            if (flowFile == null) return;
-            final Map<String,String> attributes = new HashMap<String, 
String>();
-            final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>(null);
-            final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-            final KuduSession kuduSession = this.getKuduSession(kuduClient);
-
-            session.read(flowFile, (final InputStream rawIn) -> {
-                RecordReader recordReader = null;
-                try (final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
-                    try {
-                        recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger());
-                    } catch (Exception ex) {
-                        final RecordReaderFactoryException rrfe = new 
RecordReaderFactoryException("Unable to create RecordReader", ex);
-                        exceptionHolder.set(rrfe);
-                        return;
+        final List<FlowFile> flowFiles = session.get(batchSize);
+        if (flowFiles.isEmpty()) return;
+
+        final KuduSession kuduSession = getKuduSession(kuduClient);
+        final RecordReaderFactory recordReaderFactory =
+            
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+        final Map<FlowFile, Integer> numRecords = new HashMap<>();
+        final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
+        final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+
+        int numBuffered = 0;
+        Stream<RowError> pendingRowErrors = Stream.empty();
+        for (FlowFile flowFile : flowFiles) {
+            try (final InputStream in = session.read(flowFile);
+                final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+                final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
+                final RecordSet recordSet = recordReader.createRecordSet();
+
+                // Deprecated
+                if (skipHeadLine) recordSet.next();
+
+                Record record = recordSet.next();
+                while (record != null) {
+                    Operation operation = operationType == OperationType.UPSERT
+                        ? upsertRecordToKudu(kuduTable, record, fieldNames)
+                        : insertRecordToKudu(kuduTable, record, fieldNames);
+                    // We keep track of mappings between Operations and their 
origins,
+                    // so that we know which FlowFiles should be marked 
failure after buffered flush.
+                    operationFlowFileMap.put(operation, flowFile);
+
+                    // Flush mutation buffer of KuduSession to avoid 
"MANUAL_FLUSH is enabled
+                    // but the buffer is too big" error. This can happen when 
flush mode is
+                    // MANUAL_FLUSH and a FlowFile has more than one records.
+                    if (numBuffered == batchSize && flushMode == 
FlushMode.MANUAL_FLUSH) {
+                        numBuffered = 0;
+                        pendingRowErrors = Stream.concat(pendingRowErrors, 
flushKuduSession(kuduSession, false));
                     }
 
-                    List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
-                    final RecordSet recordSet = recordReader.createRecordSet();
-
-                    if (skipHeadLine) recordSet.next();
-
-                    int numOfAddedRecord = 0;
-                    Record record = recordSet.next();
-                    while (record != null) {
-                        org.apache.kudu.client.Operation oper = null;
-                        if(operationType == OperationType.UPSERT) {
-                            oper = upsertRecordToKudu(kuduTable, record, 
fieldNames);
-                        } else {
-                            oper = insertRecordToKudu(kuduTable, record, 
fieldNames);
-                        }
-                        kuduSession.apply(oper);
-                        numOfAddedRecord++;
-                        record = recordSet.next();
+                    // OperationResponse is returned only when flush mode is 
set to AUTO_FLUSH_SYNC
+                    OperationResponse response = kuduSession.apply(operation);
+                    if (response != null && response.hasRowError()) {
+                        // Stop processing the records on the first error.
+                        // Note that Kudu does not support rolling back of 
previous operations.
+                        flowFileFailures.put(flowFile, response.getRowError());
+                        break;
                     }
 
-                    getLogger().info("KUDU: number of inserted records: " + 
numOfAddedRecord);
-                    attributes.put(RECORD_COUNT_ATTR, 
String.valueOf(numOfAddedRecord));
-
-                } catch (KuduException ex) {
-                    getLogger().error("Exception occurred while interacting 
with Kudu due to " + ex.getMessage(), ex);
-                    exceptionHolder.set(ex);
-                } catch (Exception e) {
-                    exceptionHolder.set(e);
-                } finally {
-                    IOUtils.closeQuietly(recordReader);
+                    numBuffered++;
+                    numRecords.merge(flowFile, 1, Integer::sum);
+                    record = recordSet.next();
                 }
-            });
-            kuduSession.close();
-            if (exceptionHolder.get() != null) {
-                throw exceptionHolder.get();
+            } catch (Exception ex) {
+                flowFileFailures.put(flowFile, ex);
             }
+        }
 
-            // Update flow file's attributes after the ingestion
-            session.putAllAttributes(flowFile, attributes);
-
-            session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().send(flowFile, "Successfully added 
flowfile to kudu");
-
-        } catch (IOException | FlowFileAccessException e) {
-            getLogger().error("Failed to write due to {}", new Object[]{e});
-            session.transfer(flowFile, REL_FAILURE);
-        } catch (Throwable t) {
-            getLogger().error("Failed to write due to {}", new Object[]{t});
-            session.transfer(flowFile, REL_FAILURE);
+        try {
+            // Find RowErrors for each FlowFile
+            Map<FlowFile, List<RowError>> flowFileRowErrors =
+                Stream.concat(
+                    pendingRowErrors,
+                    numBuffered > 0 ? flushKuduSession(kuduSession, true) : 
Stream.empty()
+                ).collect(Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation())));
+
+            flowFiles.forEach(ff -> {
+                int count = numRecords.getOrDefault(ff, 0);
+                List<RowError> rowErrors = flowFileRowErrors.get(ff);
+                if (rowErrors != null) {
+                    rowErrors.forEach(rowError ->
+                        getLogger().error("Failed to write due to {}", new 
Object[]{rowError}));
+                    session.putAttribute(ff, RECORD_COUNT_ATTR, 
String.valueOf(count - rowErrors.size()));
+                    session.transfer(ff, REL_FAILURE);
+                } else {
+                    session.putAttribute(ff, RECORD_COUNT_ATTR, 
String.valueOf(count));
+                    if (flowFileFailures.containsKey(ff)) {
+                        getLogger().error("Failed to write due to {}", new 
Object[]{flowFileFailures.get(ff)});
+                        session.transfer(ff, REL_FAILURE);
+                    } else {
+                        session.transfer(ff, REL_SUCCESS);
+                        session.getProvenanceReporter().send(ff, "Successfully 
added FlowFile to Kudu");
+                    }
+                }
+            });
+        } catch (Exception ex) {
+            throw new ProcessException(ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 313e49b..eb35e6d 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,13 +17,13 @@
 
 package org.apache.nifi.processors.kudu;
 
+import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.Upsert;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -35,6 +35,9 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.serialization.record.Record;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -75,27 +78,31 @@ public class PutKudu extends AbstractKudu {
     @Override
     protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) throws IllegalStateException, Exception {
         Upsert upsert = kuduTable.newUpsert();
-        this.insert(kuduTable, upsert, record, fieldNames);
+        this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, 
fieldNames);
         return upsert;
     }
 
     @Override
     protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) throws IllegalStateException, Exception {
         Insert insert = kuduTable.newInsert();
-        this.insert(kuduTable, insert, record, fieldNames);
+        this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, 
fieldNames);
         return insert;
     }
 
-    private void insert(KuduTable kuduTable, Operation operation, Record 
record, List<String> fieldNames){
-        PartialRow row = operation.getRow();
-        Schema colSchema = kuduTable.getSchema();
-
+    @VisibleForTesting
+    void buildPartialRow(Schema schema, PartialRow row, Record record, 
List<String> fieldNames) {
         for (String colName : fieldNames) {
-            int colIdx = this.getColumnIndex(colSchema, colName);
+            int colIdx = this.getColumnIndex(schema, colName);
             if (colIdx != -1) {
-                Type colType = colSchema.getColumnByIndex(colIdx).getType();
+                ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
+                Type colType = colSchema.getType();
+
+                if (record.getValue(colName) == null) {
+                    row.setNull(colName);
+                    continue;
+                }
 
-                switch (colType.getDataType()) {
+                switch (colType.getDataType(colSchema.getTypeAttributes())) {
                     case BOOL:
                         row.addBoolean(colIdx, record.getAsBoolean(colName));
                         break;
@@ -109,9 +116,11 @@ public class PutKudu extends AbstractKudu {
                         row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
                         break;
                     case INT8:
+                        row.addByte(colIdx, 
record.getAsInt(colName).byteValue());
+                        break;
                     case INT16:
-                        short temp = 
(short)record.getAsInt(colName).intValue();
-                        row.addShort(colIdx, temp);
+                        row.addShort(colIdx, 
record.getAsInt(colName).shortValue());
+                        break;
                     case INT32:
                         row.addInt(colIdx, record.getAsInt(colName));
                         break;
@@ -121,6 +130,11 @@ public class PutKudu extends AbstractKudu {
                     case STRING:
                         row.addString(colIdx, record.getAsString(colName));
                         break;
+                    case DECIMAL32:
+                    case DECIMAL64:
+                    case DECIMAL128:
+                        row.addDecimal(colIdx, new 
BigDecimal(record.getAsString(colName)));
+                        break;
                     default:
                         throw new IllegalStateException(String.format("unknown 
column type %s", colType));
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index feef584..f5a657b 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -26,14 +26,33 @@ import org.apache.kudu.client.Upsert;
 
 import org.apache.nifi.serialization.record.Record;
 
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.mockito.Mockito.mock;
 
-public class MockPutKudu extends PutKudu{
+public class MockPutKudu extends PutKudu {
+  private KuduSession session;
+  private LinkedList<Insert> insertQueue;
+
+  public MockPutKudu() {
+    this(mock(KuduSession.class));
+  }
+
+  public MockPutKudu(KuduSession session) {
+    this.session = session;
+    this.insertQueue = new LinkedList<>();
+  }
+
+  public void queue(Insert... operations) {
+    insertQueue.addAll(Arrays.asList(operations));
+  }
+
   @Override
   protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
-    return mock(Insert.class);
+    Insert insert = insertQueue.poll();
+    return insert != null ? insert : mock(Insert.class);
   }
 
   @Override
@@ -48,7 +67,7 @@ public class MockPutKudu extends PutKudu{
 
   @Override
   protected KuduSession getKuduSession(KuduClient client){
-    return mock(KuduSession.class);
+    return session;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index fa3aa77..315a0ee 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,6 +17,18 @@
 
 package org.apache.nifi.processors.kudu;
 
+import java.util.stream.Collectors;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.RowErrorsAndOverflowStatus;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
@@ -27,27 +39,43 @@ import 
org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
+import org.apache.nifi.util.Tuple;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.stubbing.OngoingStubbing;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
+
 public class TestPutKudu {
 
     public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
@@ -63,6 +91,10 @@ public class TestPutKudu {
     public void setUp() {
         processor = new MockPutKudu();
         testRunner = TestRunners.newTestRunner(processor);
+        setUpTestRunner(testRunner);
+    }
+
+    private void setUpTestRunner(TestRunner testRunner) {
         testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
         testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
         testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
@@ -125,7 +157,7 @@ public class TestPutKudu {
 
         // simulate throwing an IOException when the factory creates a reader 
which is what would happen when
         // invalid Avro is passed to the Avro reader factory
-        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        final RecordReaderFactory readerFactory = 
mock(RecordReaderFactory.class);
         when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
         when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenThrow(new 
IOException("NOT AVRO"));
 
@@ -162,10 +194,10 @@ public class TestPutKudu {
     public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() 
throws InitializationException, IOException, MalformedRecordException, 
SchemaNotFoundException {
         createRecordReader(10);
 
-        final RecordReader recordReader = Mockito.mock(RecordReader.class);
+        final RecordReader recordReader = mock(RecordReader.class);
         when(recordReader.nextRecord()).thenThrow(new 
MalformedRecordException("ERROR"));
 
-        final RecordReaderFactory readerFactory = 
Mockito.mock(RecordReaderFactory.class);
+        final RecordReaderFactory readerFactory = 
mock(RecordReaderFactory.class);
         when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
         when(readerFactory.createRecordReader(any(FlowFile.class), 
any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
 
@@ -272,4 +304,231 @@ public class TestPutKudu {
         flowFile.assertContentEquals("string".getBytes());
         flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
     }
+
+    @Test
+    public void testBuildRow() {
+        buildPartialRow((long) 1, "foo", (short) 10);
+    }
+
+    @Test
+    public void testBuildPartialRowNullable() {
+        buildPartialRow((long) 1, null, (short) 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildPartialRowNullPrimaryKey() {
+        buildPartialRow(null, "foo", (short) 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildPartialRowNotNullable() {
+        buildPartialRow((long) 1, "foo", null);
+    }
+
+    private void buildPartialRow(Long id, String name, Short age) {
+        final Schema kuduSchema = new Schema(Arrays.asList(
+            new ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
+            new ColumnSchemaBuilder("name", 
Type.STRING).nullable(true).build(),
+            new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
+            new ColumnSchemaBuilder("score", 
Type.DECIMAL).nullable(true).typeAttributes(
+                new 
ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
+            ).build()));
+
+        final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("id", RecordFieldType.BIGINT.getDataType()),
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.SHORT.getDataType()),
+            new RecordField("score", RecordFieldType.LONG.getDataType())));
+
+        Map<String, Object> values = new HashMap<>();
+        values.put("id", id);
+        values.put("name", name);
+        values.put("age", age);
+        values.put("score", 10000L);
+        new PutKudu().buildPartialRow(
+            kuduSchema,
+            kuduSchema.newPartialRow(),
+            new MapRecord(schema, values),
+            schema.getFieldNames()
+        );
+    }
+
+    private Tuple<Insert, OperationResponse> insert(boolean success) {
+        Insert insert = mock(Insert.class);
+        OperationResponse response = mock(OperationResponse.class, 
Mockito.RETURNS_DEEP_STUBS);
+        when(response.hasRowError()).thenReturn(!success);
+        if (!success) {
+            when(response.getRowError().getOperation()).thenReturn(insert);
+        }
+        return new Tuple<>(insert, response);
+    }
+
+    enum ResultCode {
+        OK,
+        FAIL,
+        EXCEPTION
+    }
+
+    private LinkedList<OperationResponse> queueInsert(MockPutKudu kudu, 
KuduSession session, boolean sync, ResultCode... results) throws Exception {
+        LinkedList<OperationResponse> responses = new LinkedList<>();
+        for (ResultCode result : results) {
+            boolean ok = result == OK;
+            Tuple<Insert, OperationResponse> tuple = insert(ok);
+            kudu.queue(tuple.getKey());
+
+            if (result == EXCEPTION) {
+                
when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class));
+                // Stop processing the rest of the records on the first 
exception
+                break;
+            } else {
+                responses.add(tuple.getValue());
+                if (sync) {
+                    when(session.apply(tuple.getKey())).thenReturn(ok ? null : 
tuple.getValue());
+
+                    // In AUTO_FLUSH_SYNC mode, PutKudu immediately knows when 
an operation has failed.
+                    // In that case, it does not process the rest of the 
records in the FlowFile.
+                    if (result == FAIL) break;
+                }
+            }
+        }
+        return responses;
+    }
+
+    private static <T> void stubSlices(OngoingStubbing<T> stubbing, List<T> 
slices) {
+        for (T slice : slices) {
+            stubbing = stubbing.thenReturn(slice);
+        }
+    }
+
+    private void testKuduPartialFailure(FlushMode flushMode, int batchSize) 
throws Exception {
+        final int numFlowFiles = 4;
+        final int numRecordsPerFlowFile = 3;
+        final ResultCode[][] flowFileResults = new ResultCode[][] {
+            new ResultCode[]{OK, OK, FAIL},
+
+            // The last operation will not be submitted to Kudu if flush mode 
is AUTO_FLUSH_SYNC
+            new ResultCode[]{OK, FAIL, OK},
+
+            // Everything's okay
+            new ResultCode[]{OK, OK, OK},
+
+            // The last operation will not be submitted due to an exception 
from apply() call
+            new ResultCode[]{OK, EXCEPTION, OK},
+        };
+
+        KuduSession session = mock(KuduSession.class);
+        when(session.getFlushMode()).thenReturn(flushMode);
+        MockPutKudu putKudu = new MockPutKudu(session);
+
+        List<List<OperationResponse>> flowFileResponses = new ArrayList<>();
+        boolean sync = flushMode == FlushMode.AUTO_FLUSH_SYNC;
+        for (ResultCode[] results : flowFileResults) {
+            flowFileResponses.add(queueInsert(putKudu, session, sync, 
results));
+        }
+
+        switch (flushMode) {
+            case AUTO_FLUSH_SYNC:
+                // flush() or close() returns an empty list
+                when(session.close()).thenReturn(Collections.emptyList());
+                break;
+            case AUTO_FLUSH_BACKGROUND:
+                // close() will be called for each batch of FlowFiles, however 
we do not check
+                // the return value of it. Instead, we should check the 
pending errors of the session
+                // as buffered operations may have already been flushed.
+                when(session.close()).thenReturn(Collections.emptyList());
+
+                List<RowErrorsAndOverflowStatus> pendingErrorResponses = new 
ArrayList<>();
+                while (!flowFileResponses.isEmpty()) {
+                    int sliceSize = Math.min(batchSize, 
flowFileResponses.size());
+                    List<List<OperationResponse>> slice = 
flowFileResponses.subList(0, sliceSize);
+
+                    RowErrorsAndOverflowStatus pendingErrorResponse = 
mock(RowErrorsAndOverflowStatus.class);
+                    RowError[] rowErrors = slice.stream()
+                        .flatMap(List::stream)
+                        .filter(OperationResponse::hasRowError)
+                        .map(OperationResponse::getRowError)
+                        .toArray(RowError[]::new);
+                    
when(pendingErrorResponse.getRowErrors()).thenReturn(rowErrors);
+                    pendingErrorResponses.add(pendingErrorResponse);
+
+                    flowFileResponses = flowFileResponses.subList(sliceSize, 
flowFileResponses.size());
+                }
+
+                stubSlices(when(session.getPendingErrors()), 
pendingErrorResponses);
+                break;
+            case MANUAL_FLUSH:
+                // close() will be called at the end of a batch, but flush() 
will also be called
+                // whenever the mutation buffer of KuduSession becomes full. 
In PutKudu, we set
+                // the size of the mutation buffer to match batchSize, so 
flush() is called only
+                // when a FlowFile more than one record.
+                List<List<OperationResponse>> flushes = new ArrayList<>();
+                List<List<OperationResponse>> closes = new ArrayList<>();
+
+                while (!flowFileResponses.isEmpty()) {
+                    int sliceSize = Math.min(batchSize, 
flowFileResponses.size());
+                    List<List<OperationResponse>> slice = 
flowFileResponses.subList(0, sliceSize);
+                    flowFileResponses = flowFileResponses.subList(sliceSize, 
flowFileResponses.size());
+
+                    List<OperationResponse> batch = new ArrayList<>();
+                    for (OperationResponse response : 
slice.stream().flatMap(List::stream).collect(Collectors.toList())) {
+                        if (batch.size() == batchSize) {
+                            flushes.add(batch);
+                            batch = new ArrayList<>();
+                        }
+                        batch.add(response);
+                    }
+                    if (flowFileResponses.isEmpty() && batch.size() == 
batchSize) {
+                        // To handle the case where PutKudu ends the batch 
with flush()
+                        // instead of close() due to the exception from the 
subsequent apply call.
+                        flushes.add(batch);
+                    } else if (batch.size() > 0) {
+                        closes.add(batch);
+                    }
+
+                    if (!flushes.isEmpty()) stubSlices(when(session.flush()), 
flushes);
+                    if (!closes.isEmpty()) stubSlices(when(session.close()), 
closes);
+                }
+                break;
+        }
+
+        testRunner = TestRunners.newTestRunner(putKudu);
+        createRecordReader(numRecordsPerFlowFile);
+        setUpTestRunner(testRunner);
+        testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name());
+        testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize));
+
+        IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue(""));
+        testRunner.run(numFlowFiles);
+
+        testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3);
+        List<MockFlowFile> failedFlowFiles = 
testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE);
+        
failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2");
+        
failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ? 
"1" : "2");
+        
failedFlowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "1");
+
+        testRunner.assertTransferCount(PutKudu.REL_SUCCESS, 1);
+        
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR,
 "3");
+    }
+
+    private void testKuduPartialFailure(FlushMode flushMode) throws Exception {
+      // Test against different batch sizes (up until the point where every 
record can be buffered at once)
+      for (int i = 1; i <= 11; i++) {
+          testKuduPartialFailure(flushMode, i);
+      }
+    }
+
+    @Test
+    public void testKuduPartialFailuresOnAutoFlushSync() throws Exception {
+        testKuduPartialFailure(FlushMode.AUTO_FLUSH_SYNC);
+    }
+
+    @Test
+    public void testKuduPartialFailuresOnAutoFlushBackground() throws 
Exception {
+        testKuduPartialFailure(FlushMode.AUTO_FLUSH_BACKGROUND);
+    }
+
+    @Test
+    public void testKuduPartialFailuresOnManualFlush() throws Exception {
+        testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
+    }
 }

Reply via email to