This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2232e28  NIFI-7956: This closes #4626. Added option of rolling back 
session on error instead of routing to failure for PutKudu
2232e28 is described below

commit 2232e280523f47daebf3ce5ff5f0080261d78928
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 27 21:18:36 2020 -0400

    NIFI-7956: This closes #4626. Added option of rolling back session on error 
instead of routing to failure for PutKudu
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 58 +++++++++++++++++++++-
 1 file changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index c811b6b..1a649b0 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.PropertyValue;
@@ -89,6 +90,12 @@ import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
 
 public class PutKudu extends AbstractKuduProcessor {
 
+    static final AllowableValue FAILURE_STRATEGY_ROUTE = new 
AllowableValue("route-to-failure", "Route to Failure",
+        "The FlowFile containing the Records that failed to insert will be 
routed to the 'failure' relationship");
+    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new 
AllowableValue("rollback", "Rollback Session",
+        "If any Record cannot be inserted, all FlowFiles in the session will 
be rolled back to their input queue. This means that if data cannot be pushed, 
" +
+            "it will block any subsequent data from be pushed to Kudu as well 
until the issue is resolved. However, this may be advantageous if a strict 
ordering is required.");
+
     protected static final PropertyDescriptor TABLE_NAME = new Builder()
         .name("Table Name")
         .description("The name of the Kudu Table to put data into")
@@ -106,6 +113,15 @@ public class PutKudu extends AbstractKuduProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FAILURE_STRATEGY = new Builder()
+        .name("Failure Strategy")
+        .displayName("Failure Strategy")
+        .description("If one or more Records in a batch cannot be transferred 
to Kudu, specifies how to handle the failure")
+        .required(true)
+        .allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK)
+        .defaultValue(FAILURE_STRATEGY_ROUTE.getValue())
+        .build();
+
     protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
         .name("Skip head line")
         .description("Deprecated. Used to ignore header lines, but this should 
be handled by a RecordReader " +
@@ -255,12 +271,14 @@ public class PutKudu extends AbstractKuduProcessor {
     private volatile SessionConfiguration.FlushMode flushMode;
     private volatile Function<Record, OperationType> recordPathOperationType;
     private volatile RecordPath dataRecordPath;
+    private volatile String failureStrategy;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(KUDU_MASTERS);
         properties.add(TABLE_NAME);
+        properties.add(FAILURE_STRATEGY);
         properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(KERBEROS_PRINCIPAL);
         properties.add(KERBEROS_PASSWORD);
@@ -305,6 +323,12 @@ public class PutKudu extends AbstractKuduProcessor {
 
         final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
         dataRecordPath = dataRecordPathValue == null ? null : 
RecordPath.compile(dataRecordPathValue);
+
+        failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
+    }
+
+    private boolean isRollbackOnFailure() {
+        return 
FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy);
     }
 
     @Override
@@ -462,10 +486,20 @@ public class PutKudu extends AbstractKuduProcessor {
                     record = recordSet.next();
                 }
             } catch (Exception ex) {
+                getLogger().error("Failed to push {} to Kudu", new Object[] 
{flowFile}, ex);
                 flowFileFailures.put(flowFile, ex);
             }
         }
 
+        // If configured to rollback on failure, and there's at least one 
error, rollback the session and return.
+        if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || 
!flowFileFailures.isEmpty())) {
+            logFailures(pendingRowErrors, operationFlowFileMap);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        // If any data is buffered, flush it.
         if (numBuffered > 0) {
             try {
                 flushKuduSession(kuduSession, true, pendingRowErrors);
@@ -479,6 +513,15 @@ public class PutKudu extends AbstractKuduProcessor {
             }
         }
 
+        // It's possible that there were no row errors when this was checked 
above, but flushing the Kudu session may have introduced
+        // one or more Row Errors. So we need to check again.
+        if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
+            logFailures(pendingRowErrors, operationFlowFileMap);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
         // Find RowErrors for each FlowFile
         final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream().collect(
             Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation())));
@@ -490,8 +533,9 @@ public class PutKudu extends AbstractKuduProcessor {
             final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
 
             if (rowErrors != null) {
-                rowErrors.forEach(rowError -> getLogger().error("Failed to 
write due to {}", new Object[]{rowError.toString()}));
+                rowErrors.forEach(rowError -> getLogger().error("Failed to 
write due to {}", new Object[] {rowError.toString()}));
                 session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count - rowErrors.size()));
+                totalCount -= rowErrors.size(); // Don't include error rows in 
the the counter.
                 session.transfer(flowFile, REL_FAILURE);
             } else {
                 session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count));
@@ -509,6 +553,18 @@ public class PutKudu extends AbstractKuduProcessor {
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
+    private void logFailures(final List<RowError> pendingRowErrors, final 
Map<Operation, FlowFile> operationFlowFileMap) {
+        final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream().collect(
+            Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation())));
+
+        for (final Map.Entry<FlowFile, List<RowError>> entry : 
flowFileRowErrors.entrySet()) {
+            final FlowFile flowFile = entry.getKey();
+            final List<RowError> errors = entry.getValue();
+
+            getLogger().error("Could not write {} to Kudu due to: {}", new 
Object[] {flowFile, errors});
+        }
+    }
+
     private String getEvaluatedProperty(PropertyDescriptor property, 
ProcessContext context, FlowFile flowFile) {
         PropertyValue evaluatedProperty = 
context.getProperty(property).evaluateAttributeExpressions(flowFile);
         if (property.isRequired() && evaluatedProperty == null) {

Reply via email to