This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3719fddf84 NIFI-12700: refactored PutKudu to optimize memory handling 
for AUTO_FLUSH_SYNC flush mode (unbatched flush)
3719fddf84 is described below

commit 3719fddf84ffcf18ed29d87ee931aa1acc1699d3
Author: emiliosetiadarma <[email protected]>
AuthorDate: Mon Jan 22 01:28:09 2024 -0800

    NIFI-12700: refactored PutKudu to optimize memory handling for 
AUTO_FLUSH_SYNC flush mode (unbatched flush)
    
    NIFI-12700: made changes based on PR comments. Simplified statements 
involving determination of whether or not there are flowfile 
failures/rowErrors. Separated out getting rowErrors from OperationResponses 
into its own function
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #8322
---
 .../processors/kudu/AbstractKuduProcessor.java     |  40 ++++--
 .../kudu/AutoFlushSyncPutKuduResult.java           |  78 +++++++++++
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 110 ++++++++--------
 .../apache/nifi/processors/kudu/PutKuduResult.java | 144 +++++++++++++++++++++
 .../processors/kudu/StandardPutKuduResult.java     |  83 ++++++++++++
 5 files changed, 388 insertions(+), 67 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 47c1a34903..b44f2330ee 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -22,6 +22,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executor;
@@ -35,6 +36,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
@@ -217,19 +220,33 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         }
     }
 
-    protected void flushKuduSession(final KuduSession kuduSession, boolean 
close, final List<RowError> rowErrors) throws KuduException {
-        final List<OperationResponse> responses = close ? kuduSession.close() 
: kuduSession.flush();
-
+    /**
+     * Get the pending errors from the active {@link KuduSession}. This will 
only be applicable if the flushMode is
+     * {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}.
+     * @return  a {@link List} of pending {@link RowError}s
+     */
+    protected List<RowError> getPendingRowErrorsFromKuduSession(final 
KuduSession kuduSession) {
         if (kuduSession.getFlushMode() == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
-            
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
+            return 
Arrays.asList(kuduSession.getPendingErrors().getRowErrors());
         } else {
-            responses.stream()
-                    .filter(OperationResponse::hasRowError)
-                    .map(OperationResponse::getRowError)
-                    .forEach(rowErrors::add);
+            return Collections.EMPTY_LIST;
         }
     }
 
+    protected List<RowError> flushKuduSession(final KuduSession kuduSession) 
throws KuduException {
+        final List<OperationResponse> responses = kuduSession.flush();
+        // RowErrors will only be present in the OperationResponses in this 
case if the flush mode
+        // selected is MANUAL_FLUSH. It will be empty otherwise.
+        return getRowErrors(responses);
+    }
+
+    protected List<RowError> closeKuduSession(final KuduSession kuduSession) 
throws KuduException {
+        final List<OperationResponse> responses = kuduSession.close();
+        // RowErrors will only be present in the OperationResponses in this 
case if the flush mode
+        // selected is MANUAL_FLUSH, since the underlying implementation of 
kuduSession.close() returns
+        // the OperationResponses from a flush() call.
+        return getRowErrors(responses);
+    }
 
     @OnStopped
     public void shutdown() throws Exception {
@@ -410,4 +427,11 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
             return String.format("PutKudu[%s]-client-%d", identifier, 
threadCount.getAndIncrement());
         }
     }
+
+    private List<RowError> getRowErrors(final List<OperationResponse> 
responses) {
+        return responses.stream()
+                .filter(OperationResponse::hasRowError)
+                .map(OperationResponse::getRowError)
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
new file mode 100644
index 0000000000..d502741688
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AutoFlushSyncPutKuduResult extends PutKuduResult {
+    private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
+
+    public AutoFlushSyncPutKuduResult() {
+        super();
+        this.flowFileRowErrorsMap = new HashMap<>();
+    }
+
+    @Override
+    public void recordOperation(final Operation operation) {
+        // this should be a no-op because we don't need to record Operation's 
origins
+        // for buffered flush when using AUTO_FLUSH_SYNC
+        return;
+    }
+
+    @Override
+    public void addError(final RowError rowError) {
+        final List<RowError> rowErrors = 
flowFileRowErrorsMap.getOrDefault(flowFile, new ArrayList<>());
+        rowErrors.add(rowError);
+        flowFileRowErrorsMap.put(flowFile, rowErrors);
+    }
+
+    @Override
+    public void addErrors(final List<RowError> rowErrors) {
+        // This is a no-op because we would never be in a situation where we'd 
have to add a collection of RowError
+        // using this Flush Mode. Since we do not keep Operation to FlowFile 
mapping, it will also be impossible to resolve
+        // RowErrors to the FlowFile that caused them, hence this method 
should never be implemented for AUTO_FLUSH_SYNC
+        return;
+    }
+
+    @Override
+    public boolean hasRowErrorsOrFailures() {
+        if (!flowFileFailures.isEmpty()) {
+            return true;
+        }
+
+        for (final Map.Entry<FlowFile, List<RowError>> entry : 
flowFileRowErrorsMap.entrySet()) {
+            if (!entry.getValue().isEmpty()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
+        return flowFileRowErrorsMap.getOrDefault(flowFile, 
Collections.EMPTY_LIST);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index eaa07617c2..a1317b3fd8 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -22,10 +22,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -357,53 +355,52 @@ public class PutKudu extends AbstractKuduProcessor {
     }
 
     private void processFlowFiles(final ProcessContext context, final 
ProcessSession session, final List<FlowFile> flowFiles, final KuduClient 
kuduClient) {
-        final Map<FlowFile, Integer> processedRecords = new HashMap<>();
-        final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
-        final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
-        final List<RowError> pendingRowErrors = new ArrayList<>();
-
         final KuduSession kuduSession = createKuduSession(kuduClient);
+        final PutKuduResult putKuduResult = flushMode == 
SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
+                ? new AutoFlushSyncPutKuduResult() : new 
StandardPutKuduResult();
         try {
             processRecords(flowFiles,
-                    processedRecords,
-                    flowFileFailures,
-                    operationFlowFileMap,
-                    pendingRowErrors,
                     session,
                     context,
                     kuduClient,
-                    kuduSession);
+                    kuduSession,
+                    putKuduResult);
         } finally {
             try {
-                flushKuduSession(kuduSession, true, pendingRowErrors);
+                final List<RowError> rowErrors = closeKuduSession(kuduSession);
+                if (flushMode == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+                    
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
+                } else {
+                    putKuduResult.addErrors(rowErrors);
+                }
             } catch (final KuduException|RuntimeException e) {
                 getLogger().error("KuduSession.close() Failed", e);
             }
         }
 
-        if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || 
!flowFileFailures.isEmpty())) {
-            logFailures(pendingRowErrors, operationFlowFileMap);
+        putKuduResult.resolveFlowFileToRowErrorAssociations();
+
+        if (isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) {
+            logFailures(putKuduResult);
             session.rollback();
             context.yield();
         } else {
-            transferFlowFiles(flowFiles, processedRecords, flowFileFailures, 
operationFlowFileMap, pendingRowErrors, session);
+            transferFlowFiles(flowFiles, session, putKuduResult);
         }
     }
 
     private void processRecords(final List<FlowFile> flowFiles,
-                                 final Map<FlowFile, Integer> processedRecords,
-                                 final Map<FlowFile, Object> flowFileFailures,
-                                 final Map<Operation, FlowFile> 
operationFlowFileMap,
-                                 final List<RowError> pendingRowErrors,
-                                 final ProcessSession session,
-                                 final ProcessContext context,
-                                 final KuduClient kuduClient,
-                                 final KuduSession kuduSession) {
+                                final ProcessSession session,
+                                final ProcessContext context,
+                                final KuduClient kuduClient,
+                                final KuduSession kuduSession,
+                                final PutKuduResult putKuduResult) {
         final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
         int bufferedRecords = 0;
         OperationType prevOperationType = OperationType.INSERT;
         for (FlowFile flowFile : flowFiles) {
+            putKuduResult.setFlowFile(flowFile);
             try (final InputStream in = session.read(flowFile);
                  final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
 
@@ -472,7 +469,12 @@ public class PutKudu extends AbstractKuduProcessor {
                         // ignore operations.
                         if (!supportsInsertIgnoreOp && prevOperationType != 
operationType
                                 && (prevOperationType == 
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
-                            flushKuduSession(kuduSession, false, 
pendingRowErrors);
+                            final List<RowError> rowErrors = 
flushKuduSession(kuduSession);
+                            if (flushMode == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+                                
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
+                            } else {
+                                putKuduResult.addErrors(rowErrors);
+                            }
                             
kuduSession.setIgnoreAllDuplicateRows(operationType == 
OperationType.INSERT_IGNORE);
                         }
                         prevOperationType = operationType;
@@ -481,34 +483,35 @@ public class PutKudu extends AbstractKuduProcessor {
                         Operation operation = 
createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, 
lowercaseFields, kuduTable);
                         // We keep track of mappings between Operations and 
their origins,
                         // so that we know which FlowFiles should be marked 
failure after buffered flush.
-                        operationFlowFileMap.put(operation, flowFile);
+                        putKuduResult.recordOperation(operation);
 
                         // Flush mutation buffer of KuduSession to avoid 
"MANUAL_FLUSH is enabled
                         // but the buffer is too big" error. This can happen 
when flush mode is
                         // MANUAL_FLUSH and a FlowFile has more than one 
records.
                         if (bufferedRecords == batchSize && flushMode == 
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
                             bufferedRecords = 0;
-                            flushKuduSession(kuduSession, false, 
pendingRowErrors);
+                            final List<RowError> rowErrors = 
flushKuduSession(kuduSession);
+                            putKuduResult.addErrors(rowErrors);
                         }
 
                         // OperationResponse is returned only when flush mode 
is set to AUTO_FLUSH_SYNC
-                        OperationResponse response = 
kuduSession.apply(operation);
+                        final OperationResponse response = 
kuduSession.apply(operation);
                         if (response != null && response.hasRowError()) {
                             // Stop processing the records on the first error.
                             // Note that Kudu does not support rolling back of 
previous operations.
-                            flowFileFailures.put(flowFile, 
response.getRowError());
+                            putKuduResult.addFailure(response.getRowError());
                             break recordReaderLoop;
                         }
 
                         bufferedRecords++;
-                        processedRecords.merge(flowFile, 1, Integer::sum);
+                        putKuduResult.incrementProcessedRecordsForFlowFile();
                     }
 
                     record = recordSet.next();
                 }
             } catch (Exception ex) {
                 getLogger().error("Failed to push {} to Kudu", flowFile, ex);
-                flowFileFailures.put(flowFile, ex);
+                putKuduResult.addFailure(ex);
             }
         }
     }
@@ -575,38 +578,28 @@ public class PutKudu extends AbstractKuduProcessor {
     }
 
     private void transferFlowFiles(final List<FlowFile> flowFiles,
-                                   final Map<FlowFile, Integer> 
processedRecords,
-                                   final Map<FlowFile, Object> 
flowFileFailures,
-                                   final Map<Operation, FlowFile> 
operationFlowFileMap,
-                                   final List<RowError> pendingRowErrors,
-                                   final ProcessSession session) {
-        // Find RowErrors for each FlowFile
-        final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream()
-                .filter(e -> operationFlowFileMap.get(e.getOperation()) != 
null)
-                .collect(
-                    Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation()))
-                );
-
+                                   final ProcessSession session,
+                                   final PutKuduResult putKuduResult) {
         long totalCount = 0L;
         for (FlowFile flowFile : flowFiles) {
-            final int count = processedRecords.getOrDefault(flowFile, 0);
+            final int count = 
putKuduResult.getProcessedRecordsForFlowFile(flowFile);
             totalCount += count;
-            final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
+            final List<RowError> rowErrors = 
putKuduResult.getRowErrorsForFlowFile(flowFile);
 
-            if (rowErrors != null) {
+            if (rowErrors != null && !rowErrors.isEmpty()) {
                 rowErrors.forEach(rowError -> getLogger().error("Failed to 
write due to {}", rowError.toString()));
                 flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
Integer.toString(count - rowErrors.size()));
-                totalCount -= rowErrors.size(); // Don't include error rows in 
the the counter.
+                totalCount -= rowErrors.size(); // Don't include error rows in 
the counter.
                 session.transfer(flowFile, REL_FAILURE);
             } else {
                 flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count));
 
-                if (flowFileFailures.containsKey(flowFile)) {
-                    getLogger().error("Failed to write due to {}", 
flowFileFailures.get(flowFile));
-                    session.transfer(flowFile, REL_FAILURE);
-                } else {
+                if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile)) {
                     session.transfer(flowFile, REL_SUCCESS);
                     session.getProvenanceReporter().send(flowFile, 
"Successfully added FlowFile to Kudu");
+                } else {
+                    getLogger().error("Failed to write due to {}", 
putKuduResult.getFailureForFlowFile(flowFile));
+                    session.transfer(flowFile, REL_FAILURE);
                 }
             }
         }
@@ -614,15 +607,14 @@ public class PutKudu extends AbstractKuduProcessor {
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
-    private void logFailures(final List<RowError> pendingRowErrors, final 
Map<Operation, FlowFile> operationFlowFileMap) {
-        final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream().collect(
-            Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation())));
-
-        for (final Map.Entry<FlowFile, List<RowError>> entry : 
flowFileRowErrors.entrySet()) {
-            final FlowFile flowFile = entry.getKey();
-            final List<RowError> errors = entry.getValue();
+    private void logFailures(final PutKuduResult putKuduResult) {
+        final Set<FlowFile> processedFlowFiles = 
putKuduResult.getProcessedFlowFiles();
+        for (final FlowFile flowFile : processedFlowFiles) {
+            final List<RowError> errors = 
putKuduResult.getRowErrorsForFlowFile(flowFile);
+            if (!errors.isEmpty()) {
+                getLogger().error("Could not write {} to Kudu due to: {}", 
flowFile, errors);
+            }
 
-            getLogger().error("Could not write {} to Kudu due to: {}", 
flowFile, errors);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
new file mode 100644
index 0000000000..f46a65d90d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class PutKuduResult {
+    protected FlowFile flowFile;
+    protected final Map<FlowFile, Object> flowFileFailures;
+    private final Set<FlowFile> processedFlowFiles;
+    private final Map<FlowFile, Integer> processedRecords;
+
+    public PutKuduResult() {
+        this.flowFile = null;
+
+        this.flowFileFailures = new HashMap<>();
+        this.processedFlowFiles = new HashSet<>();
+        this.processedRecords = new HashMap<>();
+    }
+
+    public void setFlowFile(final FlowFile flowFile) {
+        this.flowFile = flowFile;
+        processedFlowFiles.add(flowFile);
+    }
+
+    public Set<FlowFile> getProcessedFlowFiles() {
+        return this.processedFlowFiles;
+    }
+
+    public int getProcessedRecordsForFlowFile(final FlowFile flowFile) {
+        return this.processedRecords.getOrDefault(flowFile, 0);
+    }
+
+    /**
+     * Increments the number of {@link Record}s that has been successfully 
processed for this {@link FlowFile}
+     */
+    public void incrementProcessedRecordsForFlowFile() {
+        final int newCount = this.processedRecords.getOrDefault(flowFile, 0) + 
1;
+        this.processedRecords.put(flowFile, newCount);
+    }
+
+    /**
+     * Records an {@link Operation} being processed for a specific {@link 
FlowFile}
+     * @param operation the {@link Operation} to record
+     */
+    public abstract void recordOperation(final Operation operation);
+
+    /**
+     * Records a {@link RowError} for the particular {@link FlowFile} that's 
being processed
+     * @param rowError the {@link RowError} to add
+     */
+    public abstract void addError(final RowError rowError);
+
+    /**
+     * Records a {@link List} of {@link RowError}s for the particular {@link 
FlowFile} that's being processed
+     * @param rowErrors the {@link List} of {@link RowError}s to add
+     */
+    public void addErrors(final List<RowError> rowErrors) {
+        for (final RowError rowError : rowErrors) {
+            addError(rowError);
+        }
+    }
+
+    /**
+     * Records a failure (an {@link Exception} or a {@link RowError}) for the 
particular {@link FlowFile} that's being processed.
+     * A failure is defined as anything that stops the processing of the 
records in a {@link FlowFile}
+     * @param failure the {@link Exception} or {@link RowError} to add
+     */
+    public void addFailure(final Object failure) {
+        if (flowFileFailures.containsKey(flowFile)) {
+            throw new IllegalStateException("A failure has already previously 
occurred while processing FlowFile.");
+        }
+        flowFileFailures.put(flowFile, failure);
+    }
+
+
+    /**
+     * Resolves the associations between {@link FlowFile} and the {@link 
RowError}s that occurred
+     * while processing them. This is only applicable in batch sesssion 
flushes, namely when
+     * using the {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND} 
and
+     * {@code SessionConfiguration.FlushMode.MANUAL_FLUSH} flush modes. 
Otherwise, this
+     * function should be a no-op. This function should only be called once 
finished with processing
+     * all {@link FlowFile}s in a batch.
+     */
+    public void resolveFlowFileToRowErrorAssociations() {
+        return;
+    }
+
+    /**
+     * Checks whether there was a failure (i.e. either an {@link Exception} or 
{@link RowError} that happened during processing)
+     * @return {@code true} if there was a {@link Exception} or a {@link 
RowError} that happened during processing, {@code false} otherwise
+     */
+    public abstract boolean hasRowErrorsOrFailures();
+
+    /**
+     * Checks whether the {@link FlowFile} was processed successfully (i.e. no 
{@link Exception}s or
+     * {@link RowError}s occurred while processing the {@link FlowFile}).
+     *
+     * @param flowFile {@link FlowFile} to check
+     * @return {@code true} if the processing the {@link FlowFile} did not 
incur any exceptions, {@code false} otherwise
+     */
+    public boolean isFlowFileProcessedSuccessfully(final FlowFile flowFile) {
+        return !flowFileFailures.containsKey(flowFile);
+    }
+
+    /**
+     * Returns the failure ({@link Exception} or {@link RowError}) that 
occurred while processing the {@link FlowFile}
+     * @param flowFile the {@link FlowFile} to check
+     * @return the {@link Exception} or {@link RowError} if one occurred while 
processing the given {@link FlowFile} or {@code null}
+     */
+    public Object getFailureForFlowFile(final FlowFile flowFile) {
+        return flowFileFailures.get(flowFile);
+    }
+
+    /**
+     * Retrieves the {@link RowError}s that have occurred when processing a 
{@link FlowFile}
+     * @param flowFile the {@link FlowFile} to retrieve the {@link RowError}s 
of
+     * @return a {@link List} of {@link RowError}s for the {@link FlowFile} or 
an {@code Collections.EMPTY_LIST} if no errors
+     */
+    public abstract List<RowError> getRowErrorsForFlowFile(final FlowFile 
flowFile);
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
new file mode 100644
index 0000000000..7b4a61119b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class StandardPutKuduResult extends PutKuduResult {
+    private final Map<Operation, FlowFile> operationFlowFileMap;
+    private final List<RowError> pendingRowErrors;
+    private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
+
+    public StandardPutKuduResult() {
+        super();
+        this.operationFlowFileMap = new HashMap<>();
+        this.pendingRowErrors = new ArrayList<>();
+        this.flowFileRowErrorsMap = new HashMap<>();
+    }
+
+    @Override
+    public void recordOperation(final Operation operation) {
+        operationFlowFileMap.put(operation, flowFile);
+    }
+
+    @Override
+    public void addError(final RowError rowError) {
+        // When this class is used to store results from processing FlowFiles, 
the FlushMode
+        // is set to AUTO_FLUSH_BACKGROUND or MANUAL_FLUSH. In either case, we 
won't know which
+        // FlowFile/Record we are currently processing as the RowErrors are 
obtained from the KuduSession
+        // post-processing of the FlowFile/Record
+        this.pendingRowErrors.add(rowError);
+    }
+
+    @Override
+    public void resolveFlowFileToRowErrorAssociations() {
+        flowFileRowErrorsMap.putAll(pendingRowErrors.stream()
+                .filter(e -> operationFlowFileMap.get(e.getOperation()) != 
null)
+                .collect(
+                        Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation()))
+                )
+        );
+
+        pendingRowErrors.clear();
+    }
+
+    @Override
+    public boolean hasRowErrorsOrFailures() {
+        if (!flowFileFailures.isEmpty()) {
+            return true;
+        }
+
+        return flowFileRowErrorsMap.entrySet()
+                .stream()
+                .anyMatch(entry -> !entry.getValue().isEmpty());
+    }
+
+    @Override
+    public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
+        return flowFileRowErrorsMap.getOrDefault(flowFile, 
Collections.EMPTY_LIST);
+    }
+}

Reply via email to