This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3719fddf84 NIFI-12700: refactored PutKudu to optimize memory handling
for AUTO_FLUSH_SYNC flush mode (unbatched flush)
3719fddf84 is described below
commit 3719fddf84ffcf18ed29d87ee931aa1acc1699d3
Author: emiliosetiadarma <[email protected]>
AuthorDate: Mon Jan 22 01:28:09 2024 -0800
NIFI-12700: refactored PutKudu to optimize memory handling for
AUTO_FLUSH_SYNC flush mode (unbatched flush)
NIFI-12700: made changes based on PR comments. Simplified statements
involving determination of whether or not there are flowfile
failures/rowErrors. Separated out getting rowErrors from OperationResponses
into its own function
Signed-off-by: Matt Burgess <[email protected]>
This closes #8322
---
.../processors/kudu/AbstractKuduProcessor.java | 40 ++++--
.../kudu/AutoFlushSyncPutKuduResult.java | 78 +++++++++++
.../org/apache/nifi/processors/kudu/PutKudu.java | 110 ++++++++--------
.../apache/nifi/processors/kudu/PutKuduResult.java | 144 +++++++++++++++++++++
.../processors/kudu/StandardPutKuduResult.java | 83 ++++++++++++
5 files changed, 388 insertions(+), 67 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 47c1a34903..b44f2330ee 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -22,6 +22,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
@@ -35,6 +36,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@@ -217,19 +220,33 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
}
}
- protected void flushKuduSession(final KuduSession kuduSession, boolean
close, final List<RowError> rowErrors) throws KuduException {
- final List<OperationResponse> responses = close ? kuduSession.close()
: kuduSession.flush();
-
+ /**
+ * Get the pending errors from the active {@link KuduSession}. This will
only be applicable if the flushMode is
+ * {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}.
+ * @return a {@link List} of pending {@link RowError}s
+ */
+ protected List<RowError> getPendingRowErrorsFromKuduSession(final
KuduSession kuduSession) {
if (kuduSession.getFlushMode() ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
-
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
+ return
Arrays.asList(kuduSession.getPendingErrors().getRowErrors());
} else {
- responses.stream()
- .filter(OperationResponse::hasRowError)
- .map(OperationResponse::getRowError)
- .forEach(rowErrors::add);
+ return Collections.EMPTY_LIST;
}
}
+ protected List<RowError> flushKuduSession(final KuduSession kuduSession)
throws KuduException {
+ final List<OperationResponse> responses = kuduSession.flush();
+ // RowErrors will only be present in the OperationResponses in this
case if the flush mode
+ // selected is MANUAL_FLUSH. It will be empty otherwise.
+ return getRowErrors(responses);
+ }
+
+ protected List<RowError> closeKuduSession(final KuduSession kuduSession)
throws KuduException {
+ final List<OperationResponse> responses = kuduSession.close();
+ // RowErrors will only be present in the OperationResponses in this
case if the flush mode
+ // selected is MANUAL_FLUSH, since the underlying implementation of
kuduSession.close() returns
+ // the OperationResponses from a flush() call.
+ return getRowErrors(responses);
+ }
@OnStopped
public void shutdown() throws Exception {
@@ -410,4 +427,11 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
return String.format("PutKudu[%s]-client-%d", identifier,
threadCount.getAndIncrement());
}
}
+
+ private List<RowError> getRowErrors(final List<OperationResponse>
responses) {
+ return responses.stream()
+ .filter(OperationResponse::hasRowError)
+ .map(OperationResponse::getRowError)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
new file mode 100644
index 0000000000..d502741688
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AutoFlushSyncPutKuduResult extends PutKuduResult {
+ private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
+
+ public AutoFlushSyncPutKuduResult() {
+ super();
+ this.flowFileRowErrorsMap = new HashMap<>();
+ }
+
+ @Override
+ public void recordOperation(final Operation operation) {
+ // this should be a no-op because we don't need to record Operation's
origins
+ // for buffered flush when using AUTO_FLUSH_SYNC
+ return;
+ }
+
+ @Override
+ public void addError(final RowError rowError) {
+ final List<RowError> rowErrors =
flowFileRowErrorsMap.getOrDefault(flowFile, new ArrayList<>());
+ rowErrors.add(rowError);
+ flowFileRowErrorsMap.put(flowFile, rowErrors);
+ }
+
+ @Override
+ public void addErrors(final List<RowError> rowErrors) {
+ // This is a no-op because we would never be in a situation where we'd
have to add a collection of RowError
+ // using this Flush Mode. Since we do not keep Operation to FlowFile
mapping, it will also be impossible to resolve
+ // RowErrors to the FlowFile that caused them, hence this method
should never be implemented for AUTO_FLUSH_SYNC
+ return;
+ }
+
+ @Override
+ public boolean hasRowErrorsOrFailures() {
+ if (!flowFileFailures.isEmpty()) {
+ return true;
+ }
+
+ for (final Map.Entry<FlowFile, List<RowError>> entry :
flowFileRowErrorsMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
+ return flowFileRowErrorsMap.getOrDefault(flowFile,
Collections.EMPTY_LIST);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index eaa07617c2..a1317b3fd8 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -22,10 +22,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -357,53 +355,52 @@ public class PutKudu extends AbstractKuduProcessor {
}
private void processFlowFiles(final ProcessContext context, final
ProcessSession session, final List<FlowFile> flowFiles, final KuduClient
kuduClient) {
- final Map<FlowFile, Integer> processedRecords = new HashMap<>();
- final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
- final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
- final List<RowError> pendingRowErrors = new ArrayList<>();
-
final KuduSession kuduSession = createKuduSession(kuduClient);
+ final PutKuduResult putKuduResult = flushMode ==
SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
+ ? new AutoFlushSyncPutKuduResult() : new
StandardPutKuduResult();
try {
processRecords(flowFiles,
- processedRecords,
- flowFileFailures,
- operationFlowFileMap,
- pendingRowErrors,
session,
context,
kuduClient,
- kuduSession);
+ kuduSession,
+ putKuduResult);
} finally {
try {
- flushKuduSession(kuduSession, true, pendingRowErrors);
+ final List<RowError> rowErrors = closeKuduSession(kuduSession);
+ if (flushMode ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
+ } else {
+ putKuduResult.addErrors(rowErrors);
+ }
} catch (final KuduException|RuntimeException e) {
getLogger().error("KuduSession.close() Failed", e);
}
}
- if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() ||
!flowFileFailures.isEmpty())) {
- logFailures(pendingRowErrors, operationFlowFileMap);
+ putKuduResult.resolveFlowFileToRowErrorAssociations();
+
+ if (isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) {
+ logFailures(putKuduResult);
session.rollback();
context.yield();
} else {
- transferFlowFiles(flowFiles, processedRecords, flowFileFailures,
operationFlowFileMap, pendingRowErrors, session);
+ transferFlowFiles(flowFiles, session, putKuduResult);
}
}
private void processRecords(final List<FlowFile> flowFiles,
- final Map<FlowFile, Integer> processedRecords,
- final Map<FlowFile, Object> flowFileFailures,
- final Map<Operation, FlowFile>
operationFlowFileMap,
- final List<RowError> pendingRowErrors,
- final ProcessSession session,
- final ProcessContext context,
- final KuduClient kuduClient,
- final KuduSession kuduSession) {
+ final ProcessSession session,
+ final ProcessContext context,
+ final KuduClient kuduClient,
+ final KuduSession kuduSession,
+ final PutKuduResult putKuduResult) {
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int bufferedRecords = 0;
OperationType prevOperationType = OperationType.INSERT;
for (FlowFile flowFile : flowFiles) {
+ putKuduResult.setFlowFile(flowFile);
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
@@ -472,7 +469,12 @@ public class PutKudu extends AbstractKuduProcessor {
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType !=
operationType
&& (prevOperationType ==
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
- flushKuduSession(kuduSession, false,
pendingRowErrors);
+ final List<RowError> rowErrors =
flushKuduSession(kuduSession);
+ if (flushMode ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
+ } else {
+ putKuduResult.addErrors(rowErrors);
+ }
kuduSession.setIgnoreAllDuplicateRows(operationType ==
OperationType.INSERT_IGNORE);
}
prevOperationType = operationType;
@@ -481,34 +483,35 @@ public class PutKudu extends AbstractKuduProcessor {
Operation operation =
createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull,
lowercaseFields, kuduTable);
// We keep track of mappings between Operations and
their origins,
// so that we know which FlowFiles should be marked
failure after buffered flush.
- operationFlowFileMap.put(operation, flowFile);
+ putKuduResult.recordOperation(operation);
// Flush mutation buffer of KuduSession to avoid
"MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen
when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one
records.
if (bufferedRecords == batchSize && flushMode ==
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
bufferedRecords = 0;
- flushKuduSession(kuduSession, false,
pendingRowErrors);
+ final List<RowError> rowErrors =
flushKuduSession(kuduSession);
+ putKuduResult.addErrors(rowErrors);
}
// OperationResponse is returned only when flush mode
is set to AUTO_FLUSH_SYNC
- OperationResponse response =
kuduSession.apply(operation);
+ final OperationResponse response =
kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of
previous operations.
- flowFileFailures.put(flowFile,
response.getRowError());
+ putKuduResult.addFailure(response.getRowError());
break recordReaderLoop;
}
bufferedRecords++;
- processedRecords.merge(flowFile, 1, Integer::sum);
+ putKuduResult.incrementProcessedRecordsForFlowFile();
}
record = recordSet.next();
}
} catch (Exception ex) {
getLogger().error("Failed to push {} to Kudu", flowFile, ex);
- flowFileFailures.put(flowFile, ex);
+ putKuduResult.addFailure(ex);
}
}
}
@@ -575,38 +578,28 @@ public class PutKudu extends AbstractKuduProcessor {
}
private void transferFlowFiles(final List<FlowFile> flowFiles,
- final Map<FlowFile, Integer>
processedRecords,
- final Map<FlowFile, Object>
flowFileFailures,
- final Map<Operation, FlowFile>
operationFlowFileMap,
- final List<RowError> pendingRowErrors,
- final ProcessSession session) {
- // Find RowErrors for each FlowFile
- final Map<FlowFile, List<RowError>> flowFileRowErrors =
pendingRowErrors.stream()
- .filter(e -> operationFlowFileMap.get(e.getOperation()) !=
null)
- .collect(
- Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation()))
- );
-
+ final ProcessSession session,
+ final PutKuduResult putKuduResult) {
long totalCount = 0L;
for (FlowFile flowFile : flowFiles) {
- final int count = processedRecords.getOrDefault(flowFile, 0);
+ final int count =
putKuduResult.getProcessedRecordsForFlowFile(flowFile);
totalCount += count;
- final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
+ final List<RowError> rowErrors =
putKuduResult.getRowErrorsForFlowFile(flowFile);
- if (rowErrors != null) {
+ if (rowErrors != null && !rowErrors.isEmpty()) {
rowErrors.forEach(rowError -> getLogger().error("Failed to
write due to {}", rowError.toString()));
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR,
Integer.toString(count - rowErrors.size()));
- totalCount -= rowErrors.size(); // Don't include error rows in
the the counter.
+ totalCount -= rowErrors.size(); // Don't include error rows in
the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count));
- if (flowFileFailures.containsKey(flowFile)) {
- getLogger().error("Failed to write due to {}",
flowFileFailures.get(flowFile));
- session.transfer(flowFile, REL_FAILURE);
- } else {
+ if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile)) {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile,
"Successfully added FlowFile to Kudu");
+ } else {
+ getLogger().error("Failed to write due to {}",
putKuduResult.getFailureForFlowFile(flowFile));
+ session.transfer(flowFile, REL_FAILURE);
}
}
}
@@ -614,15 +607,14 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
- private void logFailures(final List<RowError> pendingRowErrors, final
Map<Operation, FlowFile> operationFlowFileMap) {
- final Map<FlowFile, List<RowError>> flowFileRowErrors =
pendingRowErrors.stream().collect(
- Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation())));
-
- for (final Map.Entry<FlowFile, List<RowError>> entry :
flowFileRowErrors.entrySet()) {
- final FlowFile flowFile = entry.getKey();
- final List<RowError> errors = entry.getValue();
+ private void logFailures(final PutKuduResult putKuduResult) {
+ final Set<FlowFile> processedFlowFiles =
putKuduResult.getProcessedFlowFiles();
+ for (final FlowFile flowFile : processedFlowFiles) {
+ final List<RowError> errors =
putKuduResult.getRowErrorsForFlowFile(flowFile);
+ if (!errors.isEmpty()) {
+ getLogger().error("Could not write {} to Kudu due to: {}",
flowFile, errors);
+ }
- getLogger().error("Could not write {} to Kudu due to: {}",
flowFile, errors);
}
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
new file mode 100644
index 0000000000..f46a65d90d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class PutKuduResult {
+ protected FlowFile flowFile;
+ protected final Map<FlowFile, Object> flowFileFailures;
+ private final Set<FlowFile> processedFlowFiles;
+ private final Map<FlowFile, Integer> processedRecords;
+
+ public PutKuduResult() {
+ this.flowFile = null;
+
+ this.flowFileFailures = new HashMap<>();
+ this.processedFlowFiles = new HashSet<>();
+ this.processedRecords = new HashMap<>();
+ }
+
+ public void setFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ processedFlowFiles.add(flowFile);
+ }
+
+ public Set<FlowFile> getProcessedFlowFiles() {
+ return this.processedFlowFiles;
+ }
+
+ public int getProcessedRecordsForFlowFile(final FlowFile flowFile) {
+ return this.processedRecords.getOrDefault(flowFile, 0);
+ }
+
+ /**
+ * Increments the number of {@link Record}s that has been successfully
processed for this {@link FlowFile}
+ */
+ public void incrementProcessedRecordsForFlowFile() {
+ final int newCount = this.processedRecords.getOrDefault(flowFile, 0) +
1;
+ this.processedRecords.put(flowFile, newCount);
+ }
+
+ /**
+ * Records an {@link Operation} being processed for a specific {@link
FlowFile}
+ * @param operation the {@link Operation} to record
+ */
+ public abstract void recordOperation(final Operation operation);
+
+ /**
+ * Records a {@link RowError} for the particular {@link FlowFile} that's
being processed
+ * @param rowError the {@link RowError} to add
+ */
+ public abstract void addError(final RowError rowError);
+
+ /**
+ * Records a {@link List} of {@link RowError}s for the particular {@link
FlowFile} that's being processed
+ * @param rowErrors the {@link List} of {@link RowError}s to add
+ */
+ public void addErrors(final List<RowError> rowErrors) {
+ for (final RowError rowError : rowErrors) {
+ addError(rowError);
+ }
+ }
+
+ /**
+ * Records a failure (an {@link Exception} or a {@link RowError}) for the
particular {@link FlowFile} that's being processed.
+ * A failure is defined as anything that stops the processing of the
records in a {@link FlowFile}
+ * @param failure the {@link Exception} or {@link RowError} to add
+ */
+ public void addFailure(final Object failure) {
+ if (flowFileFailures.containsKey(flowFile)) {
+ throw new IllegalStateException("A failure has already previously
occurred while processing FlowFile.");
+ }
+ flowFileFailures.put(flowFile, failure);
+ }
+
+
+ /**
+ * Resolves the associations between {@link FlowFile} and the {@link
RowError}s that occurred
+ * while processing them. This is only applicable in batch sesssion
flushes, namely when
+ * using the {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}
and
+ * {@code SessionConfiguration.FlushMode.MANUAL_FLUSH} flush modes.
Otherwise, this
+ * function should be a no-op. This function should only be called once
finished with processing
+ * all {@link FlowFile}s in a batch.
+ */
+ public void resolveFlowFileToRowErrorAssociations() {
+ return;
+ }
+
+ /**
+ * Checks whether there was a failure (i.e. either an {@link Exception} or
{@link RowError} that happened during processing)
+ * @return {@code true} if there was a {@link Exception} or a {@link
RowError} that happened during processing, {@code false} otherwise
+ */
+ public abstract boolean hasRowErrorsOrFailures();
+
+ /**
+ * Checks whether the {@link FlowFile} was processed successfully (i.e. no
{@link Exception}s or
+ * {@link RowError}s occurred while processing the {@link FlowFile}).
+ *
+ * @param flowFile {@link FlowFile} to check
+ * @return {@code true} if the processing the {@link FlowFile} did not
incur any exceptions, {@code false} otherwise
+ */
+ public boolean isFlowFileProcessedSuccessfully(final FlowFile flowFile) {
+ return !flowFileFailures.containsKey(flowFile);
+ }
+
+ /**
+ * Returns the failure ({@link Exception} or {@link RowError}) that
occurred while processing the {@link FlowFile}
+ * @param flowFile the {@link FlowFile} to check
+ * @return the {@link Exception} or {@link RowError} if one occurred while
processing the given {@link FlowFile} or {@code null}
+ */
+ public Object getFailureForFlowFile(final FlowFile flowFile) {
+ return flowFileFailures.get(flowFile);
+ }
+
+ /**
+ * Retrieves the {@link RowError}s that have occurred when processing a
{@link FlowFile}
+ * @param flowFile the {@link FlowFile} to retrieve the {@link RowError}s
of
+ * @return a {@link List} of {@link RowError}s for the {@link FlowFile} or
an {@code Collections.EMPTY_LIST} if no errors
+ */
+ public abstract List<RowError> getRowErrorsForFlowFile(final FlowFile
flowFile);
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
new file mode 100644
index 0000000000..7b4a61119b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowError;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class StandardPutKuduResult extends PutKuduResult {
+ private final Map<Operation, FlowFile> operationFlowFileMap;
+ private final List<RowError> pendingRowErrors;
+ private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
+
+ public StandardPutKuduResult() {
+ super();
+ this.operationFlowFileMap = new HashMap<>();
+ this.pendingRowErrors = new ArrayList<>();
+ this.flowFileRowErrorsMap = new HashMap<>();
+ }
+
+ @Override
+ public void recordOperation(final Operation operation) {
+ operationFlowFileMap.put(operation, flowFile);
+ }
+
+ @Override
+ public void addError(final RowError rowError) {
+ // When this class is used to store results from processing FlowFiles,
the FlushMode
+ // is set to AUTO_FLUSH_BACKGROUND or MANUAL_FLUSH. In either case, we
won't know which
+ // FlowFile/Record we are currently processing as the RowErrors are
obtained from the KuduSession
+ // post-processing of the FlowFile/Record
+ this.pendingRowErrors.add(rowError);
+ }
+
+ @Override
+ public void resolveFlowFileToRowErrorAssociations() {
+ flowFileRowErrorsMap.putAll(pendingRowErrors.stream()
+ .filter(e -> operationFlowFileMap.get(e.getOperation()) !=
null)
+ .collect(
+ Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation()))
+ )
+ );
+
+ pendingRowErrors.clear();
+ }
+
+ @Override
+ public boolean hasRowErrorsOrFailures() {
+ if (!flowFileFailures.isEmpty()) {
+ return true;
+ }
+
+ return flowFileRowErrorsMap.entrySet()
+ .stream()
+ .anyMatch(entry -> !entry.getValue().isEmpty());
+ }
+
+ @Override
+ public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
+ return flowFileRowErrorsMap.getOrDefault(flowFile,
Collections.EMPTY_LIST);
+ }
+}