This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3755a64788 NIFI-13531 Fixed ExecuteSQLRecord to honor FlowFile
attributes for configured RecordWriter (#9909)
3755a64788 is described below
commit 3755a647880426b4e73f9be7fba152eef59b249a
Author: Filip Maretić <[email protected]>
AuthorDate: Fri Oct 24 23:36:25 2025 +0200
NIFI-13531 Fixed ExecuteSQLRecord to honor FlowFile attributes for
configured RecordWriter (#9909)
Signed-off-by: David Handermann <[email protected]>
---
.../serialization/record/MockCsvRecordWriter.java | 171 +++++++++++++++++++++
.../serialization/record/MockRecordWriter.java | 44 ++++--
.../processors/standard/sql/RecordSqlWriter.java | 2 +-
.../processors/standard/TestExecuteSQLRecord.java | 42 +++++
4 files changed, 247 insertions(+), 12 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
new file mode 100644
index 0000000000..643c013fdc
--- /dev/null
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * {@code MockCsvRecordWriter} is a concrete implementation of a {@link
RecordSetWriterFactory}
+ * that produces CSV-formatted output. It extends the {@link MockRecordWriter}
and provides a
+ * fluent {@link Builder} to configure properties such as header inclusion,
value quoting,
+ * output buffering, and schema specification.
+ * <p>
+ * This class is suitable for use in testing scenarios or simple serialization
tasks where
+ * CSV output is needed and full schema compliance is not required.
+ *
+ * <p><strong>Example usage:</strong></p>
+ * <pre>{@code
+ * RecordSchema schema = ...; // define schema as needed
+ * MockCsvRecordWriter writer = MockCsvRecordWriter.builder()
+ * .withHeader("id,name,age")
+ * .quoteValues(true)
+ * .bufferOutput(true)
+ * .withSchema(schema)
+ * .withSeparator(attributes -> attributes.getOrDefault("csv.separator",
","))
+ * .build();
+ * }</pre>
+ */
+public final class MockCsvRecordWriter extends MockRecordWriter {
+
+ /**
+ * Returns a new {@link Builder} instance for constructing a {@code
MockCsvRecordWriter}.
+ *
+ * @return builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Constructs a MockCsvRecordWriter with the specified configuration.
+ *
+ * @param header optional CSV header line (e.g., "id,name,age"), may be
{@code null}
+ * @param quoteValues whether to quote individual field values (useful for
text fields)
+ * @param failAfterN if positive, causes writer to throw IOException after
writing N records (used in testing)
+ * @param bufferOutput whether to wrap output stream in a {@link
java.io.BufferedOutputStream}
+ * @param schema optional {@link RecordSchema} used for field name and
type resolution
+ * @param mapToSeparator a function to dynamically resolve the field
separator based on FlowFile attributes
+ */
+ private MockCsvRecordWriter(
+ String header,
+ boolean quoteValues,
+ int failAfterN,
+ boolean bufferOutput,
+ RecordSchema schema,
+ Function<Map<String, String>, String> mapToSeparator) {
+ super(header, quoteValues, failAfterN, bufferOutput, schema,
mapToSeparator);
+ }
+
+ /**
+ * Builder class for fluent construction of {@link MockCsvRecordWriter}
instances.
+ */
+ public static class Builder {
+ private String header = null;
+ private boolean quoteValues = true;
+ private int failAfterN = -1;
+ private boolean bufferOutput = false;
+ private RecordSchema schema = null;
+ private Function<Map<String, String>, String> mapToSeparator =
attributes -> DEFAULT_SEPARATOR;
+
+ private Builder() {
+ }
+
+ /**
+ * Specifies a header line to be written before the CSV data.
+ *
+ * @param header the header line (e.g., "id,name,age"); can be {@code
null} to omit header
+ * @return the current {@code Builder} instance
+ */
+ public Builder withHeader(String header) {
+ this.header = header;
+ return this;
+ }
+
+ /**
+ * Specifies whether field values should be enclosed in double quotes.
+ *
+ * @param quoteValues {@code true} to quote values, {@code false}
otherwise
+ * @return the current {@code Builder} instance
+ */
+ public Builder quoteValues(boolean quoteValues) {
+ this.quoteValues = quoteValues;
+ return this;
+ }
+
+ /**
+ * Configures the writer to throw an {@link java.io.IOException} after
writing a given number of records.
+ * Primarily used for testing failure scenarios.
+ *
+ * @param failAfterN the number of records after which to fail; use -1
to disable
+ * @return the current {@code Builder} instance
+ */
+ public Builder failAfterN(int failAfterN) {
+ this.failAfterN = failAfterN;
+ return this;
+ }
+
+ /**
+ * Specifies whether the output stream should be wrapped in a {@link
java.io.BufferedOutputStream}.
+ *
+ * @param bufferOutput {@code true} to enable buffering, {@code false}
to write directly
+ * @return the current {@code Builder} instance
+ */
+ public Builder bufferOutput(boolean bufferOutput) {
+ this.bufferOutput = bufferOutput;
+ return this;
+ }
+
+ /**
+ * Sets the schema to be used for writing records.
+ *
+ * @param schema the {@link RecordSchema}; can be {@code null} if
schema inference is acceptable
+ * @return the current {@code Builder} instance
+ */
+ public Builder withSchema(RecordSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ /**
+ * Specifies a function to dynamically resolve the field separator
based on FlowFile attributes.
+ * <p><strong>Example usage:</strong></p>
+ * <pre>{@code
+ * builder.withSeparator(attributes ->
attributes.getOrDefault("csv.separator", ","));
+ * }</pre>
+ *
+ * @param mapToSeparator a function that maps FlowFile attributes to a
separator string
+ * @return the current {@code Builder} instance
+ */
+ public Builder withSeparator(Function<Map<String, String>, String>
mapToSeparator) {
+ this.mapToSeparator = mapToSeparator;
+ return this;
+ }
+
+
+ /**
+ * Constructs a new {@link MockCsvRecordWriter} instance using the
configured parameters.
+ *
+ * @return a fully initialized {@link MockCsvRecordWriter}
+ */
+ public MockCsvRecordWriter build() {
+ return new MockCsvRecordWriter(header, quoteValues, failAfterN,
bufferOutput, schema, mapToSeparator);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 1398922919..8e10b42dfd 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -32,14 +32,17 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
public class MockRecordWriter extends AbstractControllerService implements
RecordSetWriterFactory {
+ static final String DEFAULT_SEPARATOR = ",";
+
private final String header;
private final int failAfterN;
private final boolean quoteValues;
private final boolean bufferOutput;
-
private final RecordSchema writeSchema;
+ private final Function<Map<String, String>, String> mapToSeparator;
public MockRecordWriter() {
this(null);
@@ -61,28 +64,47 @@ public class MockRecordWriter extends
AbstractControllerService implements Recor
this(header, quoteValues, -1, bufferOutput, null);
}
- public MockRecordWriter(final String header, final boolean quoteValues,
final int failAfterN, final boolean bufferOutput, final RecordSchema
writeSchema) {
+ public MockRecordWriter(final String header,
+ final boolean quoteValues,
+ final int failAfterN,
+ final boolean bufferOutput,
+ final RecordSchema writeSchema) {
+ this(header, quoteValues, failAfterN, bufferOutput, writeSchema,
variables -> DEFAULT_SEPARATOR);
+ }
+
+ protected MockRecordWriter(final String header,
+ final boolean quoteValues,
+ final int failAfterN,
+ final boolean bufferOutput,
+ final RecordSchema writeSchema,
+ final Function<Map<String, String>, String>
mapToSeparator) {
this.header = header;
this.quoteValues = quoteValues;
this.failAfterN = failAfterN;
this.bufferOutput = bufferOutput;
this.writeSchema = writeSchema;
+ this.mapToSeparator = mapToSeparator;
}
@Override
- public RecordSchema getSchema(Map<String, String> variables, RecordSchema
readSchema) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(Map<String, String> variables, RecordSchema
readSchema)
+ throws SchemaNotFoundException, IOException {
return (writeSchema != null) ? writeSchema : new
SimpleRecordSchema(Collections.emptyList());
}
@Override
- public RecordSetWriter createWriter(final ComponentLog logger, final
RecordSchema schema, final OutputStream rawOut, Map<String, String> variables) {
+ public RecordSetWriter createWriter(
+ final ComponentLog logger,
+ final RecordSchema schema,
+ final OutputStream rawOut,
+ final Map<String, String> variables) {
+ final String separator = mapToSeparator.apply(variables);
final OutputStream out = bufferOutput ? new
BufferedOutputStream(rawOut) : rawOut;
return new RecordSetWriter() {
private int recordCount = 0;
private boolean headerWritten = false;
-
- private RecordSchema writerSchema = schema;
+ private final RecordSchema writerSchema = schema;
@Override
public void flush() throws IOException {
@@ -98,7 +120,7 @@ public class MockRecordWriter extends
AbstractControllerService implements Recor
}
int recordCount = 0;
- Record record = null;
+ Record record;
while ((record = rs.next()) != null) {
if (++recordCount > failAfterN && failAfterN > -1) {
throw new IOException("Unit Test intentionally
throwing IOException after " + failAfterN + " records were written");
@@ -128,7 +150,7 @@ public class MockRecordWriter extends
AbstractControllerService implements Recor
}
if (i++ < numCols - 1) {
- out.write(",".getBytes());
+ out.write(separator.getBytes());
}
}
out.write("\n".getBytes());
@@ -177,7 +199,7 @@ public class MockRecordWriter extends
AbstractControllerService implements Recor
}
if (i++ < numCols - 1) {
- out.write(",".getBytes());
+ out.write(separator.getBytes());
}
}
out.write("\n".getBytes());
@@ -191,11 +213,11 @@ public class MockRecordWriter extends
AbstractControllerService implements Recor
}
@Override
- public void beginRecordSet() throws IOException {
+ public void beginRecordSet() {
}
@Override
- public WriteResult finishRecordSet() throws IOException {
+ public WriteResult finishRecordSet() {
return WriteResult.of(recordCount, Collections.emptyMap());
}
};
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index 18f7f12494..5aa5392fda 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -78,7 +78,7 @@ public class RecordSqlWriter implements SqlWriter {
} catch (final SQLException | SchemaNotFoundException | IOException e)
{
throw new ProcessException(e);
}
- try (final RecordSetWriter resultSetWriter =
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream,
Collections.emptyMap())) {
+ try (final RecordSetWriter resultSetWriter =
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream,
originalAttributes)) {
writeResultRef.set(resultSetWriter.write(recordSet));
if (mimeType == null) {
mimeType = resultSetWriter.getMimeType();
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 9b63a13537..2ae982d52d 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -27,6 +27,7 @@ import
org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockCsvRecordWriter;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
@@ -157,6 +158,47 @@ public class TestExecuteSQLRecord {
invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
}
+ @Test
+ public void testFlowFileAttributeResolution() throws
InitializationException, SQLException {
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException ignored) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 2; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES ("
+ i + ", 1, 1)");
+ }
+
+ MockRecordWriter recordWriter = MockCsvRecordWriter.builder()
+ .withHeader("foo|bar|baz")
+ .quoteValues(false)
+ .withSeparator(attr -> attr.getOrDefault("csv.separator", ","))
+ .build();
+
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.enqueue("", Map.of("csv.separator", "|"));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+ MockFlowFile out =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).getFirst();
+ out.assertContentEquals("""
+ foo|bar|baz
+ 0|1|1
+ 1|1|1
+ """);
+ }
+
@Test
public void testWithOutputBatching() throws InitializationException,
SQLException {
// load test data to database