This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7846408b4a NIFI-14508 Upgraded fastcsv from 2.2.2 to 3.6.0 (#9911)
7846408b4a is described below

commit 7846408b4a9d949690c560e513b575e7b1b0ede8
Author: Pierre Villard <[email protected]>
AuthorDate: Fri May 2 17:43:31 2025 +0200

    NIFI-14508 Upgraded fastcsv from 2.2.2 to 3.6.0 (#9911)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-record-serialization-services/pom.xml     |   2 +-
 .../org/apache/nifi/csv/FastCSVRecordReader.java   | 118 ++++++++++----------
 .../org/apache/nifi/csv/WriteFastCSVResult.java    | 122 +++++++++------------
 3 files changed, 113 insertions(+), 129 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 5b8101816b..235b77bbfb 100755
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -89,7 +89,7 @@
         <dependency>
             <groupId>de.siegmar</groupId>
             <artifactId>fastcsv</artifactId>
-            <version>2.2.2</version>
+            <version>3.6.0</version>
         </dependency>
         <dependency>
             <groupId>com.github.palindromicity</groupId>
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
index 510cf89156..e9f53608da 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
@@ -19,7 +19,17 @@ package org.apache.nifi.csv;
 
 import de.siegmar.fastcsv.reader.CommentStrategy;
 import de.siegmar.fastcsv.reader.CsvReader;
-import de.siegmar.fastcsv.reader.CsvRow;
+import de.siegmar.fastcsv.reader.CsvRecord;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -33,30 +43,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
 
 public class FastCSVRecordReader extends AbstractCSVRecordReader {
-    private final CsvReader csvReader;
-    private final Iterator<CsvRow> csvRowIterator;
 
-    private List<RecordField> recordFields;
+    private final CsvReader<CsvRecord> csvReader;
+    private final Iterator<CsvRecord> csvRecordIterator;
 
+    private List<RecordField> recordFields;
     private Map<String, Integer> headerMap;
 
     private final boolean ignoreHeader;
     private final boolean trimDoubleQuote;
     private final CSVFormat csvFormat;
 
-    public FastCSVRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean 
hasHeader, final boolean ignoreHeader,
-                               final String dateFormat, final String 
timeFormat, final String timestampFormat, final String encoding, final boolean 
trimDoubleQuote) throws IOException {
+    public FastCSVRecordReader(final InputStream in,
+            final ComponentLog logger,
+            final RecordSchema schema,
+            final CSVFormat csvFormat,
+            final boolean hasHeader,
+            final boolean ignoreHeader,
+            final String dateFormat,
+            final String timeFormat,
+            final String timestampFormat,
+            final String encoding,
+            final boolean trimDoubleQuote) throws IOException {
         super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, 
timestampFormat, trimDoubleQuote);
         this.ignoreHeader = ignoreHeader;
         this.trimDoubleQuote = trimDoubleQuote;
@@ -66,8 +76,8 @@ public class FastCSVRecordReader extends 
AbstractCSVRecordReader {
             .fieldSeparator(csvFormat.getDelimiterString().charAt(0))
                 .quoteCharacter(csvFormat.getQuoteCharacter())
                 .commentStrategy(CommentStrategy.SKIP)
-                .skipEmptyRows(csvFormat.getIgnoreEmptyLines())
-                
.errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames());
+                .skipEmptyLines(csvFormat.getIgnoreEmptyLines())
+                
.ignoreDifferentFieldCount(csvFormat.getAllowMissingColumnNames());
 
         if (csvFormat.getCommentMarker() != null) {
             builder.commentCharacter(csvFormat.getCommentMarker());
@@ -82,23 +92,26 @@ public class FastCSVRecordReader extends 
AbstractCSVRecordReader {
             }
         }
 
-        csvReader = builder.build(new InputStreamReader(in, encoding));
-        csvRowIterator = csvReader.iterator();
+        this.csvReader = builder.ofCsvRecord(new InputStreamReader(in, 
encoding));
+        this.csvRecordIterator = csvReader.iterator();
     }
 
     @Override
-    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields)
+            throws IOException, MalformedRecordException {
 
         try {
             final RecordSchema schema = getSchema();
-
             final List<RecordField> recordFields = getRecordFields();
             final int numFieldNames = recordFields.size();
-            if (!csvRowIterator.hasNext()) {
+
+            if (!csvRecordIterator.hasNext()) {
                 return null;
             }
-            final CsvRow csvRecord = csvRowIterator.next();
+
+            final CsvRecord csvRecord = csvRecordIterator.next();
             final Map<String, Object> values = new 
LinkedHashMap<>(recordFields.size() * 2);
+
             for (int i = 0; i < csvRecord.getFieldCount(); i++) {
                 String rawValue = csvRecord.getField(i);
                 if (csvFormat.getTrim()) {
@@ -108,28 +121,20 @@ public class FastCSVRecordReader extends 
AbstractCSVRecordReader {
                     rawValue = trim(rawValue);
                 }
 
-                final String rawFieldName;
-                final DataType dataType;
                 if (i >= numFieldNames) {
                     if (!dropUnknownFields) {
                         values.put("unknown_field_index_" + i, rawValue);
                     }
                     continue;
-                } else {
-                    final RecordField recordField = recordFields.get(i);
-                    rawFieldName = recordField.getFieldName();
-                    dataType = recordField.getDataType();
                 }
 
-                final Object value;
-                if (coerceTypes) {
-                    value = convert(rawValue, dataType, rawFieldName);
-                } else {
-                    // The CSV Reader is going to return all fields as 
Strings, because CSV doesn't have any way to
-                    // dictate a field type. As a result, we will use the 
schema that we have to attempt to convert
-                    // the value into the desired type if it's a simple type.
-                    value = convertSimpleIfPossible(rawValue, dataType, 
rawFieldName);
-                }
+                final RecordField recordField = recordFields.get(i);
+                final String rawFieldName = recordField.getFieldName();
+                final DataType dataType = recordField.getDataType();
+
+                final Object value = coerceTypes
+                        ? convert(rawValue, dataType, rawFieldName)
+                        : convertSimpleIfPossible(rawValue, dataType, 
rawFieldName);
 
                 values.putIfAbsent(rawFieldName, value);
             }
@@ -140,10 +145,9 @@ public class FastCSVRecordReader extends 
AbstractCSVRecordReader {
         }
     }
 
-
     private List<RecordField> getRecordFields() {
-        if (this.recordFields != null) {
-            return this.recordFields;
+        if (recordFields != null) {
+            return recordFields;
         }
 
         if (ignoreHeader) {
@@ -152,39 +156,33 @@ public class FastCSVRecordReader extends 
AbstractCSVRecordReader {
                     + "have the same number of fields, as this is not 
conformant to RFC-4180");
         }
 
-        // When getting the field names from the first record, it has to be 
read in
-        if (!csvRowIterator.hasNext()) {
+        if (!csvRecordIterator.hasNext()) {
             return Collections.emptyList();
         }
-        CsvRow headerRow = csvRowIterator.next();
+
+        // read header row
+        CsvRecord headerRecord = csvRecordIterator.next();
         headerMap = new HashMap<>();
-        for (int i = 0; i < headerRow.getFieldCount(); i++) {
-            String rawValue = headerRow.getField(i);
+        for (int i = 0; i < headerRecord.getFieldCount(); i++) {
+            String rawValue = headerRecord.getField(i);
             if (csvFormat.getTrim()) {
                 rawValue = rawValue.trim();
             }
-            if (this.trimDoubleQuote) {
+            if (trimDoubleQuote) {
                 rawValue = trim(rawValue);
             }
             headerMap.put(rawValue, i);
         }
 
-
-        // Use a SortedMap keyed by index of the field so that we can get a 
List of field names in the correct order
-        final SortedMap<Integer, String> sortedMap = new TreeMap<>();
-        for (final Map.Entry<String, Integer> entry : headerMap.entrySet()) {
+        SortedMap<Integer, String> sortedMap = new TreeMap<>();
+        for (Map.Entry<String, Integer> entry : headerMap.entrySet()) {
             sortedMap.put(entry.getValue(), entry.getKey());
         }
 
-        final List<RecordField> fields = new ArrayList<>();
-        final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
-        for (final String rawFieldName : rawFieldNames) {
-            final Optional<RecordField> option = schema.getField(rawFieldName);
-            if (option.isPresent()) {
-                fields.add(option.get());
-            } else {
-                fields.add(new RecordField(rawFieldName, 
RecordFieldType.STRING.getDataType()));
-            }
+        List<RecordField> fields = new ArrayList<>();
+        for (String rawFieldName : new ArrayList<>(sortedMap.values())) {
+            Optional<RecordField> optField = 
getSchema().getField(rawFieldName);
+            fields.add(optField.orElseGet(() -> new RecordField(rawFieldName, 
RecordFieldType.STRING.getDataType())));
         }
 
         this.recordFields = fields;
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
index 8722aa20b7..28cadd4d8d 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
@@ -19,14 +19,7 @@ package org.apache.nifi.csv;
 
 import de.siegmar.fastcsv.writer.CsvWriter;
 import de.siegmar.fastcsv.writer.LineDelimiter;
-import de.siegmar.fastcsv.writer.QuoteStrategy;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import de.siegmar.fastcsv.writer.QuoteStrategies;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.QuoteMode;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
@@ -39,6 +32,14 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
 import static org.apache.commons.csv.QuoteMode.MINIMAL;
 
 public class WriteFastCSVResult extends AbstractRecordSetWriter implements 
RecordSetWriter, RawRecordWriter {
@@ -48,18 +49,23 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
     private final String timeFormat;
     private final String timestampFormat;
 
-    CsvWriter csvWriter;
-
-    //Need to call flush() on the underlying writer
-    final OutputStreamWriter streamWriter;
+    private final CsvWriter csvWriter;
+    private final OutputStreamWriter streamWriter;
 
     private final String[] fieldValues;
     private final boolean includeHeaderLine;
     private boolean headerWritten = false;
     private String[] fieldNames;
 
-    public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
-                                 final String dateFormat, final String 
timeFormat, final String timestampFormat, final boolean includeHeaderLine, 
final String charSet) throws IOException {
+    public WriteFastCSVResult(final CSVFormat csvFormat,
+            final RecordSchema recordSchema,
+            final SchemaAccessWriter schemaWriter,
+            final OutputStream out,
+            final String dateFormat,
+            final String timeFormat,
+            final String timestampFormat,
+            final boolean includeHeaderLine,
+            final String charSet) throws IOException {
 
         super(out);
         this.recordSchema = recordSchema;
@@ -69,7 +75,8 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
         this.timestampFormat = timestampFormat;
         this.includeHeaderLine = includeHeaderLine;
 
-        streamWriter = new OutputStreamWriter(out, charSet);
+        this.streamWriter = new OutputStreamWriter(out, charSet);
+
         CsvWriter.CsvWriterBuilder builder = CsvWriter.builder()
             .fieldSeparator(csvFormat.getDelimiterString().charAt(0))
                 .quoteCharacter(csvFormat.getQuoteCharacter());
@@ -77,14 +84,15 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
         QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL : 
csvFormat.getQuoteMode();
         switch (quoteMode) {
             case ALL:
-                builder.quoteStrategy(QuoteStrategy.ALWAYS);
+                builder.quoteStrategy(QuoteStrategies.ALWAYS);
                 break;
+            case ALL_NON_NULL:
             case NON_NUMERIC:
-                builder.quoteStrategy(QuoteStrategy.NON_EMPTY);
+                builder.quoteStrategy(QuoteStrategies.NON_EMPTY);
+                break;
+            default:
+                // MINIMAL or NONE → FastCSV's default (required)
                 break;
-            case MINIMAL:
-            case NONE:
-                builder.quoteStrategy(QuoteStrategy.REQUIRED);
         }
 
         try {
@@ -95,22 +103,21 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
         }
 
         if (csvFormat.getEscapeCharacter() != null && 
csvFormat.getEscapeCharacter() != '\"') {
-            throw new IOException("Escape character must be a double-quote 
character (\") per the FastCSV conformance to the RFC4180 specification");
+            throw new IOException("Escape character must be a double-quote 
character (\") per RFC-4180");
         }
 
-        csvWriter = builder.build(streamWriter);
-        fieldValues = new String[recordSchema.getFieldCount()];
+        this.csvWriter = builder.build(streamWriter);
+        this.fieldValues = new String[recordSchema.getFieldCount()];
     }
 
     private String getFormat(final RecordField field) {
         final DataType dataType = field.getDataType();
         return switch (dataType.getFieldType()) {
-            case DATE -> dateFormat;
-            case TIME -> timeFormat;
+        case DATE -> dateFormat;
+        case TIME -> timeFormat;
             case TIMESTAMP -> timestampFormat;
-            default -> dataType.getFormat();
+        default -> dataType.getFormat();
         };
-
     }
 
     @Override
@@ -120,7 +127,6 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
 
     @Override
     protected Map<String, String> onFinishRecordSet() throws IOException {
-        // If the header has not yet been written (but should be), write it 
out now
         includeHeaderIfNecessary(null, true);
         return schemaWriter.getAttributes(recordSchema);
     }
@@ -139,83 +145,63 @@ public class WriteFastCSVResult extends 
AbstractRecordSetWriter implements Recor
         if (fieldNames != null) {
             return fieldNames;
         }
-
-        final Set<String> allFields = new LinkedHashSet<>();
-        // The fields defined in the schema should be written first followed 
by extra ones.
+        Set<String> allFields = new LinkedHashSet<>();
         allFields.addAll(recordSchema.getFieldNames());
         allFields.addAll(record.getRawFieldNames());
         fieldNames = allFields.toArray(new String[0]);
         return fieldNames;
     }
 
-    private void includeHeaderIfNecessary(final Record record, final boolean 
includeOnlySchemaFields) throws IOException {
+    private void includeHeaderIfNecessary(final Record record, final boolean 
schemaOnly) throws IOException {
         if (headerWritten || !includeHeaderLine) {
             return;
         }
+        String[] names = schemaOnly
+                ? recordSchema.getFieldNames().toArray(new String[0])
+                : getFieldNames(record);
 
-        final String[] fieldNames;
-        if (includeOnlySchemaFields) {
-            fieldNames = recordSchema.getFieldNames().toArray(new String[0]);
-        } else {
-            fieldNames = getFieldNames(record);
-        }
-
-        csvWriter.writeRow(fieldNames);
+        csvWriter.writeRecord(names);
         headerWritten = true;
     }
 
     @Override
     public Map<String, String> writeRecord(final Record record) throws 
IOException {
-        // If we are not writing an active record set, then we need to ensure 
that we write the
-        // schema information.
         if (!isActiveRecordSet()) {
             schemaWriter.writeHeader(recordSchema, getOutputStream());
         }
-
         includeHeaderIfNecessary(record, true);
 
         int i = 0;
-        for (final RecordField recordField : recordSchema.getFields()) {
-            String fieldValue = getFieldValue(record, recordField);
-            fieldValues[i++] = fieldValue;
+        for (RecordField field : recordSchema.getFields()) {
+            fieldValues[i++] = record.getAsString(field, getFormat(field));
         }
 
-        csvWriter.writeRow(fieldValues);
+        csvWriter.writeRecord(fieldValues);
         return schemaWriter.getAttributes(recordSchema);
     }
 
-    private String getFieldValue(final Record record, final RecordField 
recordField) {
-        return record.getAsString(recordField, getFormat(recordField));
-    }
-
     @Override
     public WriteResult writeRawRecord(final Record record) throws IOException {
-        // If we are not writing an active record set, then we need to ensure 
that we write the
-        // schema information.
         if (!isActiveRecordSet()) {
             schemaWriter.writeHeader(recordSchema, getOutputStream());
         }
-
         includeHeaderIfNecessary(record, false);
 
-        final String[] fieldNames = getFieldNames(record);
-        // Avoid creating a new Object[] for every Record if we can. But if 
the record has a different number of columns than does our
-        // schema, we don't have a lot of options here, so we just create a 
new Object[] in that case.
-        final String[] recordFieldValues = (fieldNames.length == 
this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+        String[] names = getFieldNames(record);
+        String[] values = (names.length == fieldValues.length)
+                ? fieldValues
+                : new String[names.length];
 
         int i = 0;
-        for (final String fieldName : fieldNames) {
-            final Optional<RecordField> recordField = 
recordSchema.getField(fieldName);
-            if (recordField.isPresent()) {
-                recordFieldValues[i++] = record.getAsString(fieldName, 
getFormat(recordField.get()));
-            } else {
-                recordFieldValues[i++] = record.getAsString(fieldName);
-            }
+        for (String name : names) {
+            Optional<RecordField> rf = recordSchema.getField(name);
+            values[i++] = rf
+                    .map(f -> record.getAsString(f, getFormat(f)))
+                    .orElseGet(() -> record.getAsString(name));
         }
 
-        csvWriter.writeRow(recordFieldValues);
-        final Map<String, String> attributes = 
schemaWriter.getAttributes(recordSchema);
-        return WriteResult.of(incrementRecordCount(), attributes);
+        csvWriter.writeRecord(values);
+        return WriteResult.of(incrementRecordCount(), 
schemaWriter.getAttributes(recordSchema));
     }
 
     @Override

Reply via email to