This is an automated email from the ASF dual-hosted git repository.
mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 821ad8fe86 NIFI-15169 Ensure ExecuteSQLRecord routes streaming failures
821ad8fe86 is described below
commit 821ad8fe860873048a4b6d385839c17eeb87825f
Author: Rajmund Takacs <[email protected]>
AuthorDate: Mon Dec 1 15:04:07 2025 +0100
NIFI-15169 Ensure ExecuteSQLRecord routes streaming failures
Signed-off-by: Mark Bathori <[email protected]>
This closes #10584.
---
.../processors/standard/AbstractExecuteSQL.java | 22 +++--
.../processors/standard/TestExecuteSQLRecord.java | 103 +++++++++++++++++++++
2 files changed, 119 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 29bd87d134..64b5350b1d 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -248,6 +248,7 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
}
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+ Map<String, String> retainedFlowFileAttributes = null;
final ComponentLog logger = getLogger();
final int queryTimeout =
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
@@ -404,8 +405,10 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
// If we've reached the batch size, send
out the flow files
if (outputBatchSize > 0 &&
resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles,
REL_SUCCESS);
- // Need to remove the original input
file if it exists
+ // Need to remove the original input
file if it exists,
+ // but save the attributes to add them
to the failure flow file if it is created
if (fileToProcess != null) {
+ retainedFlowFileAttributes = new
HashMap<>(fileToProcess.getAttributes());
session.remove(fileToProcess);
fileToProcess = null;
}
@@ -492,20 +495,27 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original
file, but if we had no results then
// pass the original flow file down the line to trigger
downstream processors
- if (fileToProcess == null) {
+ // or create a new empty flow file with the original attributes
if the original file no longer exists
+ FlowFile failureFlowFile = fileToProcess;
+ if (failureFlowFile == null && retainedFlowFileAttributes != null)
{
+ failureFlowFile = session.create();
+ failureFlowFile = session.putAllAttributes(failureFlowFile,
retainedFlowFileAttributes);
+ }
+
+ if (failureFlowFile == null) {
// This can happen if any exceptions occur while setting up
the connection, statement, etc.
logger.error("Unable to execute SQL select query [{}]. No
FlowFile to route to failure", selectQuery, e);
context.yield();
} else {
if (context.hasIncomingConnection()) {
- logger.error("Unable to execute SQL select query [{}] for
{} routing to failure", selectQuery, fileToProcess, e);
- fileToProcess = session.penalize(fileToProcess);
+ logger.error("Unable to execute SQL select query [{}] for
{} routing to failure", selectQuery, failureFlowFile, e);
+ failureFlowFile = session.penalize(failureFlowFile);
} else {
logger.error("Unable to execute SQL select query [{}]
routing to failure", selectQuery, e);
context.yield();
}
- session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE,
e.getMessage());
- session.transfer(fileToProcess, REL_FAILURE);
+ session.putAttribute(failureFlowFile, RESULT_ERROR_MESSAGE,
e.getMessage());
+ session.transfer(failureFlowFile, REL_FAILURE);
}
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index b035fb8f28..77da600b30 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
@@ -24,9 +25,14 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MockCsvRecordWriter;
import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -38,6 +44,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -46,9 +54,11 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -418,6 +428,35 @@ class TestExecuteSQLRecord extends
AbstractDatabaseConnectionServiceTest {
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS,
0);
}
+ @Test
+ public void
testStreamingExceptionWhileOutputBatchingRoutesOriginalFlowFileToFailure()
throws Exception {
+ executeSql("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+ for (int i = 0; i < 5; i++) {
+ executeSql("insert into TEST_NULL_INT (id, val1, val2) VALUES (" +
i + ", 1, 1)");
+ }
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(AbstractExecuteSQL.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "1");
+ runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "1");
+
+ FailAfterNRecordSetWriterFactory recordWriter = new
FailAfterNRecordSetWriterFactory(2);
+ runner.addControllerService("fail-writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY,
"fail-writer");
+ runner.enableControllerService(recordWriter);
+
+ runner.enqueue("SELECT * FROM TEST_NULL_INT", Map.of("retain.attr",
"retain-value"));
+ runner.run();
+
+ runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 2);
+ runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1);
+
+ MockFlowFile failedFlowFile =
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).getFirst();
+ final String errorMessage =
failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE);
+ assertTrue(errorMessage.endsWith("Simulated failure after 2 successful
writes"), () -> "Unexpected error message: " + errorMessage);
+ failedFlowFile.assertAttributeEquals("retain.attr", "retain-value");
+ }
+
public void invokeOnTriggerRecords(final Integer queryTimeout, final
String query, final boolean incomingFlowFile, final Map<String, String> attrs,
final boolean setQueryProperty)
throws InitializationException, SQLException {
@@ -626,4 +665,68 @@ class TestExecuteSQLRecord extends
AbstractDatabaseConnectionServiceTest {
}
}
}
+
+ private static final class FailAfterNRecordSetWriterFactory extends
AbstractControllerService implements RecordSetWriterFactory {
+ private final AtomicInteger successfulWrites = new AtomicInteger(0);
+ private final int failAfter;
+
+ FailAfterNRecordSetWriterFactory(final int failAfter) {
+ this.failAfter = failAfter;
+ }
+
+ @Override
+ public RecordSchema getSchema(final Map<String, String> variables,
final RecordSchema readSchema) {
+ return readSchema;
+ }
+
+ @Override
+ public RecordSetWriter createWriter(final ComponentLog logger, final
RecordSchema schema, final OutputStream out, final Map<String, String>
variables) {
+ return new RecordSetWriter() {
+ private long currentRecordCount;
+
+ @Override
+ public WriteResult write(final RecordSet recordSet) throws
IOException {
+ if (successfulWrites.get() >= failAfter) {
+ throw new IOException("Simulated failure after " +
failAfter + " successful writes");
+ }
+
+ long count = 0;
+ while (recordSet.next() != null) {
+ count++;
+ }
+ currentRecordCount = count;
+ successfulWrites.incrementAndGet();
+ return WriteResult.of(Math.toIntExact(count),
Collections.emptyMap());
+ }
+
+ @Override
+ public WriteResult write(final Record record) throws
IOException {
+ throw new UnsupportedOperationException("Record-by-record
writes not supported");
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/plain";
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void beginRecordSet() throws IOException {
+ currentRecordCount = 0;
+ }
+
+ @Override
+ public WriteResult finishRecordSet() throws IOException {
+ return WriteResult.of(Math.toIntExact(currentRecordCount),
Collections.emptyMap());
+ }
+ };
+ }
+ }
}