This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3755a64788 NIFI-13531 Fixed ExecuteSQLRecord to honor FlowFile 
attributes for configured RecordWriter (#9909)
3755a64788 is described below

commit 3755a647880426b4e73f9be7fba152eef59b249a
Author: Filip Maretić <[email protected]>
AuthorDate: Fri Oct 24 23:36:25 2025 +0200

    NIFI-13531 Fixed ExecuteSQLRecord to honor FlowFile attributes for 
configured RecordWriter (#9909)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../serialization/record/MockCsvRecordWriter.java  | 171 +++++++++++++++++++++
 .../serialization/record/MockRecordWriter.java     |  44 ++++--
 .../processors/standard/sql/RecordSqlWriter.java   |   2 +-
 .../processors/standard/TestExecuteSQLRecord.java  |  42 +++++
 4 files changed, 247 insertions(+), 12 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
new file mode 100644
index 0000000000..643c013fdc
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockCsvRecordWriter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * {@code MockCsvRecordWriter} is a concrete implementation of a {@link 
RecordSetWriterFactory}
+ * that produces CSV-formatted output. It extends the {@link MockRecordWriter} 
and provides a
+ * fluent {@link Builder} to configure properties such as header inclusion, 
value quoting,
+ * output buffering, and schema specification.
+ * <p>
+ * This class is suitable for use in testing scenarios or simple serialization 
tasks where
+ * CSV output is needed and full schema compliance is not required.
+ *
+ * <p><strong>Example usage:</strong></p>
+ * <pre>{@code
+ * RecordSchema schema = ...; // define schema as needed
+ * MockCsvRecordWriter writer = MockCsvRecordWriter.builder()
+ *     .withHeader("id,name,age")
+ *     .quoteValues(true)
+ *     .bufferOutput(true)
+ *     .withSchema(schema)
+ *     .withSeparator(attributes -> attributes.getOrDefault("csv.separator", 
","))
+ *     .build();
+ * }</pre>
+ */
+public final class MockCsvRecordWriter extends MockRecordWriter {
+
+    /**
+     * Returns a new {@link Builder} instance for constructing a {@code 
MockCsvRecordWriter}.
+     *
+     * @return builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Constructs a MockCsvRecordWriter with the specified configuration.
+     *
+     * @param header optional CSV header line (e.g., "id,name,age"), may be 
{@code null}
+     * @param quoteValues whether to quote individual field values (useful for 
text fields)
+     * @param failAfterN if positive, causes writer to throw IOException after 
writing N records (used in testing)
+     * @param bufferOutput whether to wrap output stream in a {@link 
java.io.BufferedOutputStream}
+     * @param schema optional {@link RecordSchema} used for field name and 
type resolution
+     * @param mapToSeparator a function to dynamically resolve the field 
separator based on FlowFile attributes
+     */
+    private MockCsvRecordWriter(
+            String header,
+            boolean quoteValues,
+            int failAfterN,
+            boolean bufferOutput,
+            RecordSchema schema,
+            Function<Map<String, String>, String> mapToSeparator) {
+        super(header, quoteValues, failAfterN, bufferOutput, schema, 
mapToSeparator);
+    }
+
+    /**
+     * Builder class for fluent construction of {@link MockCsvRecordWriter} 
instances.
+     */
+    public static class Builder {
+        private String header = null;
+        private boolean quoteValues = true;
+        private int failAfterN = -1;
+        private boolean bufferOutput = false;
+        private RecordSchema schema = null;
+        private Function<Map<String, String>, String> mapToSeparator = 
attributes -> DEFAULT_SEPARATOR;
+
+        private Builder() {
+        }
+
+        /**
+         * Specifies a header line to be written before the CSV data.
+         *
+         * @param header the header line (e.g., "id,name,age"); can be {@code 
null} to omit header
+         * @return the current {@code Builder} instance
+         */
+        public Builder withHeader(String header) {
+            this.header = header;
+            return this;
+        }
+
+        /**
+         * Specifies whether field values should be enclosed in double quotes.
+         *
+         * @param quoteValues {@code true} to quote values, {@code false} 
otherwise
+         * @return the current {@code Builder} instance
+         */
+        public Builder quoteValues(boolean quoteValues) {
+            this.quoteValues = quoteValues;
+            return this;
+        }
+
+        /**
+         * Configures the writer to throw an {@link java.io.IOException} after 
writing a given number of records.
+         * Primarily used for testing failure scenarios.
+         *
+         * @param failAfterN the number of records after which to fail; use -1 
to disable
+         * @return the current {@code Builder} instance
+         */
+        public Builder failAfterN(int failAfterN) {
+            this.failAfterN = failAfterN;
+            return this;
+        }
+
+        /**
+         * Specifies whether the output stream should be wrapped in a {@link 
java.io.BufferedOutputStream}.
+         *
+         * @param bufferOutput {@code true} to enable buffering, {@code false} 
to write directly
+         * @return the current {@code Builder} instance
+         */
+        public Builder bufferOutput(boolean bufferOutput) {
+            this.bufferOutput = bufferOutput;
+            return this;
+        }
+
+        /**
+         * Sets the schema to be used for writing records.
+         *
+         * @param schema the {@link RecordSchema}; can be {@code null} if 
schema inference is acceptable
+         * @return the current {@code Builder} instance
+         */
+        public Builder withSchema(RecordSchema schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        /**
+         * Specifies a function to dynamically resolve the field separator 
based on FlowFile attributes.
+         * <p><strong>Example usage:</strong></p>
+         * <pre>{@code
+         *     builder.withSeparator(attributes -> 
attributes.getOrDefault("csv.separator", ","));
+         * }</pre>
+         *
+         * @param mapToSeparator a function that maps FlowFile attributes to a 
separator string
+         * @return the current {@code Builder} instance
+         */
+        public Builder withSeparator(Function<Map<String, String>, String> 
mapToSeparator) {
+            this.mapToSeparator = mapToSeparator;
+            return this;
+        }
+
+
+        /**
+         * Constructs a new {@link MockCsvRecordWriter} instance using the 
configured parameters.
+         *
+         * @return a fully initialized {@link MockCsvRecordWriter}
+         */
+        public MockCsvRecordWriter build() {
+            return new MockCsvRecordWriter(header, quoteValues, failAfterN, 
bufferOutput, schema, mapToSeparator);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 1398922919..8e10b42dfd 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -32,14 +32,17 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class MockRecordWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
+    static final String DEFAULT_SEPARATOR = ",";
+
     private final String header;
     private final int failAfterN;
     private final boolean quoteValues;
     private final boolean bufferOutput;
-
     private final RecordSchema writeSchema;
+    private final Function<Map<String, String>, String> mapToSeparator;
 
     public MockRecordWriter() {
         this(null);
@@ -61,28 +64,47 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
         this(header, quoteValues, -1, bufferOutput, null);
     }
 
-    public MockRecordWriter(final String header, final boolean quoteValues, 
final int failAfterN, final boolean bufferOutput, final RecordSchema 
writeSchema) {
+    public MockRecordWriter(final String header,
+                            final boolean quoteValues,
+                            final int failAfterN,
+                            final boolean bufferOutput,
+                            final RecordSchema writeSchema) {
+        this(header, quoteValues, failAfterN, bufferOutput, writeSchema, 
variables -> DEFAULT_SEPARATOR);
+    }
+
+    protected MockRecordWriter(final String header,
+                               final boolean quoteValues,
+                               final int failAfterN,
+                               final boolean bufferOutput,
+                               final RecordSchema writeSchema,
+                               final Function<Map<String, String>, String> 
mapToSeparator) {
         this.header = header;
         this.quoteValues = quoteValues;
         this.failAfterN = failAfterN;
         this.bufferOutput = bufferOutput;
         this.writeSchema = writeSchema;
+        this.mapToSeparator = mapToSeparator;
     }
 
     @Override
-    public RecordSchema getSchema(Map<String, String> variables, RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
+    public RecordSchema getSchema(Map<String, String> variables, RecordSchema 
readSchema)
+            throws SchemaNotFoundException, IOException {
         return (writeSchema != null) ? writeSchema : new 
SimpleRecordSchema(Collections.emptyList());
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final OutputStream rawOut, Map<String, String> variables) {
+    public RecordSetWriter createWriter(
+            final ComponentLog logger,
+            final RecordSchema schema,
+            final OutputStream rawOut,
+            final Map<String, String> variables) {
+        final String separator = mapToSeparator.apply(variables);
         final OutputStream out = bufferOutput ? new 
BufferedOutputStream(rawOut) : rawOut;
 
         return new RecordSetWriter() {
             private int recordCount = 0;
             private boolean headerWritten = false;
-
-            private RecordSchema writerSchema = schema;
+            private final RecordSchema writerSchema = schema;
 
             @Override
             public void flush() throws IOException {
@@ -98,7 +120,7 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
                 }
 
                 int recordCount = 0;
-                Record record = null;
+                Record record;
                 while ((record = rs.next()) != null) {
                     if (++recordCount > failAfterN && failAfterN > -1) {
                         throw new IOException("Unit Test intentionally 
throwing IOException after " + failAfterN + " records were written");
@@ -128,7 +150,7 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
                         }
 
                         if (i++ < numCols - 1) {
-                            out.write(",".getBytes());
+                            out.write(separator.getBytes());
                         }
                     }
                     out.write("\n".getBytes());
@@ -177,7 +199,7 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
                     }
 
                     if (i++ < numCols - 1) {
-                        out.write(",".getBytes());
+                        out.write(separator.getBytes());
                     }
                 }
                 out.write("\n".getBytes());
@@ -191,11 +213,11 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
             }
 
             @Override
-            public void beginRecordSet() throws IOException {
+            public void beginRecordSet() {
             }
 
             @Override
-            public WriteResult finishRecordSet() throws IOException {
+            public WriteResult finishRecordSet() {
                 return WriteResult.of(recordCount, Collections.emptyMap());
             }
         };
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index 18f7f12494..5aa5392fda 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -78,7 +78,7 @@ public class RecordSqlWriter implements SqlWriter {
         } catch (final SQLException | SchemaNotFoundException | IOException e) 
{
             throw new ProcessException(e);
         }
-        try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, 
Collections.emptyMap())) {
+        try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, 
originalAttributes)) {
             writeResultRef.set(resultSetWriter.write(recordSet));
             if (mimeType == null) {
                 mimeType = resultSetWriter.getMimeType();
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 9b63a13537..2ae982d52d 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -27,6 +27,7 @@ import 
org.apache.nifi.processors.standard.sql.RecordSqlWriter;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockCsvRecordWriter;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
@@ -157,6 +158,47 @@ public class TestExecuteSQLRecord {
         invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
     }
 
+    @Test
+    public void testFlowFileAttributeResolution() throws 
InitializationException, SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException ignored) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 2; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
+        }
+
+        MockRecordWriter recordWriter = MockCsvRecordWriter.builder()
+                .withHeader("foo|bar|baz")
+                .quoteValues(false)
+                .withSeparator(attr -> attr.getOrDefault("csv.separator", ","))
+                .build();
+
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQLRecord.SQL_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.enqueue("", Map.of("csv.separator", "|"));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).getFirst();
+        out.assertContentEquals("""
+                foo|bar|baz
+                0|1|1
+                1|1|1
+                """);
+    }
+
     @Test
     public void testWithOutputBatching() throws InitializationException, 
SQLException {
         // load test data to database

Reply via email to