This is an automated email from the ASF dual-hosted git repository.

mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 821ad8fe86 NIFI-15169 Ensure ExecuteSQLRecord routes streaming failures
821ad8fe86 is described below

commit 821ad8fe860873048a4b6d385839c17eeb87825f
Author: Rajmund Takacs <[email protected]>
AuthorDate: Mon Dec 1 15:04:07 2025 +0100

    NIFI-15169 Ensure ExecuteSQLRecord routes streaming failures
    
    Signed-off-by: Mark Bathori <[email protected]>
    
    This closes #10584.
---
 .../processors/standard/AbstractExecuteSQL.java    |  22 +++--
 .../processors/standard/TestExecuteSQLRecord.java  | 103 +++++++++++++++++++++
 2 files changed, 119 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 29bd87d134..64b5350b1d 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -248,6 +248,7 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         }
 
         final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+        Map<String, String> retainedFlowFileAttributes = null;
 
         final ComponentLog logger = getLogger();
         final int queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
@@ -404,8 +405,10 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                     // If we've reached the batch size, send 
out the flow files
                                     if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
                                         session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
-                                        // Need to remove the original input 
file if it exists
+                                        // Need to remove the original input 
file if it exists,
+                                        // but save the attributes to add them 
to the failure flow file if it is created
                                         if (fileToProcess != null) {
+                                            retainedFlowFileAttributes = new 
HashMap<>(fileToProcess.getAttributes());
                                             session.remove(fileToProcess);
                                             fileToProcess = null;
                                         }
@@ -492,20 +495,27 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         } catch (final ProcessException | SQLException e) {
             //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
             //  pass the original flow file down the line to trigger 
downstream processors
-            if (fileToProcess == null) {
+            //  or create a new empty flow file with the original attributes 
if the original file no longer exists
+            FlowFile failureFlowFile = fileToProcess;
+            if (failureFlowFile == null && retainedFlowFileAttributes != null) 
{
+                failureFlowFile = session.create();
+                failureFlowFile = session.putAllAttributes(failureFlowFile, 
retainedFlowFileAttributes);
+            }
+
+            if (failureFlowFile == null) {
                 // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
                 logger.error("Unable to execute SQL select query [{}]. No 
FlowFile to route to failure", selectQuery, e);
                 context.yield();
             } else {
                 if (context.hasIncomingConnection()) {
-                    logger.error("Unable to execute SQL select query [{}] for 
{} routing to failure", selectQuery, fileToProcess, e);
-                    fileToProcess = session.penalize(fileToProcess);
+                    logger.error("Unable to execute SQL select query [{}] for 
{} routing to failure", selectQuery, failureFlowFile, e);
+                    failureFlowFile = session.penalize(failureFlowFile);
                 } else {
                     logger.error("Unable to execute SQL select query [{}] 
routing to failure", selectQuery, e);
                     context.yield();
                 }
-                session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE, 
e.getMessage());
-                session.transfer(fileToProcess, REL_FAILURE);
+                session.putAttribute(failureFlowFile, RESULT_ERROR_MESSAGE, 
e.getMessage());
+                session.transfer(failureFlowFile, REL_FAILURE);
             }
         }
     }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index b035fb8f28..77da600b30 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
@@ -24,9 +25,14 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.MockCsvRecordWriter;
 import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -38,6 +44,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.sql.Array;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -46,9 +54,11 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -418,6 +428,35 @@ class TestExecuteSQLRecord extends 
AbstractDatabaseConnectionServiceTest {
         runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 
0);
     }
 
+    @Test
+    public void 
testStreamingExceptionWhileOutputBatchingRoutesOriginalFlowFileToFailure() 
throws Exception {
+        executeSql("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+        for (int i = 0; i < 5; i++) {
+            executeSql("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + 
i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "1");
+        runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "1");
+
+        FailAfterNRecordSetWriterFactory recordWriter = new 
FailAfterNRecordSetWriterFactory(2);
+        runner.addControllerService("fail-writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, 
"fail-writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.enqueue("SELECT * FROM TEST_NULL_INT", Map.of("retain.attr", 
"retain-value"));
+        runner.run();
+
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 2);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1);
+
+        MockFlowFile failedFlowFile = 
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).getFirst();
+        final String errorMessage = 
failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE);
+        assertTrue(errorMessage.endsWith("Simulated failure after 2 successful 
writes"), () -> "Unexpected error message: " + errorMessage);
+        failedFlowFile.assertAttributeEquals("retain.attr", "retain-value");
+    }
+
     public void invokeOnTriggerRecords(final Integer queryTimeout, final 
String query, final boolean incomingFlowFile, final Map<String, String> attrs, 
final boolean setQueryProperty)
             throws InitializationException, SQLException {
 
@@ -626,4 +665,68 @@ class TestExecuteSQLRecord extends 
AbstractDatabaseConnectionServiceTest {
             }
         }
     }
+
+    private static final class FailAfterNRecordSetWriterFactory extends 
AbstractControllerService implements RecordSetWriterFactory {
+        private final AtomicInteger successfulWrites = new AtomicInteger(0);
+        private final int failAfter;
+
+        FailAfterNRecordSetWriterFactory(final int failAfter) {
+            this.failAfter = failAfter;
+        }
+
+        @Override
+        public RecordSchema getSchema(final Map<String, String> variables, 
final RecordSchema readSchema) {
+            return readSchema;
+        }
+
+        @Override
+        public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final OutputStream out, final Map<String, String> 
variables) {
+            return new RecordSetWriter() {
+                private long currentRecordCount;
+
+                @Override
+                public WriteResult write(final RecordSet recordSet) throws 
IOException {
+                    if (successfulWrites.get() >= failAfter) {
+                        throw new IOException("Simulated failure after " + 
failAfter + " successful writes");
+                    }
+
+                    long count = 0;
+                    while (recordSet.next() != null) {
+                        count++;
+                    }
+                    currentRecordCount = count;
+                    successfulWrites.incrementAndGet();
+                    return WriteResult.of(Math.toIntExact(count), 
Collections.emptyMap());
+                }
+
+                @Override
+                public WriteResult write(final Record record) throws 
IOException {
+                    throw new UnsupportedOperationException("Record-by-record 
writes not supported");
+                }
+
+                @Override
+                public String getMimeType() {
+                    return "text/plain";
+                }
+
+                @Override
+                public void flush() throws IOException {
+                }
+
+                @Override
+                public void close() throws IOException {
+                }
+
+                @Override
+                public void beginRecordSet() throws IOException {
+                    currentRecordCount = 0;
+                }
+
+                @Override
+                public WriteResult finishRecordSet() throws IOException {
+                    return WriteResult.of(Math.toIntExact(currentRecordCount), 
Collections.emptyMap());
+                }
+            };
+        }
+    }
 }

Reply via email to