This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7846408b4a NIFI-14508 Upgraded fastcsv from 2.2.2 to 3.6.0 (#9911)
7846408b4a is described below
commit 7846408b4a9d949690c560e513b575e7b1b0ede8
Author: Pierre Villard <[email protected]>
AuthorDate: Fri May 2 17:43:31 2025 +0200
NIFI-14508 Upgraded fastcsv from 2.2.2 to 3.6.0 (#9911)
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-record-serialization-services/pom.xml | 2 +-
.../org/apache/nifi/csv/FastCSVRecordReader.java | 118 ++++++++++----------
.../org/apache/nifi/csv/WriteFastCSVResult.java | 122 +++++++++------------
3 files changed, 113 insertions(+), 129 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 5b8101816b..235b77bbfb 100755
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -89,7 +89,7 @@
<dependency>
<groupId>de.siegmar</groupId>
<artifactId>fastcsv</artifactId>
- <version>2.2.2</version>
+ <version>3.6.0</version>
</dependency>
<dependency>
<groupId>com.github.palindromicity</groupId>
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
index 510cf89156..e9f53608da 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
@@ -19,7 +19,17 @@ package org.apache.nifi.csv;
import de.siegmar.fastcsv.reader.CommentStrategy;
import de.siegmar.fastcsv.reader.CsvReader;
-import de.siegmar.fastcsv.reader.CsvRow;
+import de.siegmar.fastcsv.reader.CsvRecord;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -33,30 +43,30 @@ import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
public class FastCSVRecordReader extends AbstractCSVRecordReader {
- private final CsvReader csvReader;
- private final Iterator<CsvRow> csvRowIterator;
- private List<RecordField> recordFields;
+ private final CsvReader<CsvRecord> csvReader;
+ private final Iterator<CsvRecord> csvRecordIterator;
+ private List<RecordField> recordFields;
private Map<String, Integer> headerMap;
private final boolean ignoreHeader;
private final boolean trimDoubleQuote;
private final CSVFormat csvFormat;
- public FastCSVRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean
hasHeader, final boolean ignoreHeader,
- final String dateFormat, final String
timeFormat, final String timestampFormat, final String encoding, final boolean
trimDoubleQuote) throws IOException {
+ public FastCSVRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final RecordSchema schema,
+ final CSVFormat csvFormat,
+ final boolean hasHeader,
+ final boolean ignoreHeader,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final String encoding,
+ final boolean trimDoubleQuote) throws IOException {
super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat,
timestampFormat, trimDoubleQuote);
this.ignoreHeader = ignoreHeader;
this.trimDoubleQuote = trimDoubleQuote;
@@ -66,8 +76,8 @@ public class FastCSVRecordReader extends
AbstractCSVRecordReader {
.fieldSeparator(csvFormat.getDelimiterString().charAt(0))
.quoteCharacter(csvFormat.getQuoteCharacter())
.commentStrategy(CommentStrategy.SKIP)
- .skipEmptyRows(csvFormat.getIgnoreEmptyLines())
-
.errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames());
+ .skipEmptyLines(csvFormat.getIgnoreEmptyLines())
+
.ignoreDifferentFieldCount(csvFormat.getAllowMissingColumnNames());
if (csvFormat.getCommentMarker() != null) {
builder.commentCharacter(csvFormat.getCommentMarker());
@@ -82,23 +92,26 @@ public class FastCSVRecordReader extends
AbstractCSVRecordReader {
}
}
- csvReader = builder.build(new InputStreamReader(in, encoding));
- csvRowIterator = csvReader.iterator();
+ this.csvReader = builder.ofCsvRecord(new InputStreamReader(in,
encoding));
+ this.csvRecordIterator = csvReader.iterator();
}
@Override
- public Record nextRecord(final boolean coerceTypes, final boolean
dropUnknownFields) throws IOException, MalformedRecordException {
+ public Record nextRecord(final boolean coerceTypes, final boolean
dropUnknownFields)
+ throws IOException, MalformedRecordException {
try {
final RecordSchema schema = getSchema();
-
final List<RecordField> recordFields = getRecordFields();
final int numFieldNames = recordFields.size();
- if (!csvRowIterator.hasNext()) {
+
+ if (!csvRecordIterator.hasNext()) {
return null;
}
- final CsvRow csvRecord = csvRowIterator.next();
+
+ final CsvRecord csvRecord = csvRecordIterator.next();
final Map<String, Object> values = new
LinkedHashMap<>(recordFields.size() * 2);
+
for (int i = 0; i < csvRecord.getFieldCount(); i++) {
String rawValue = csvRecord.getField(i);
if (csvFormat.getTrim()) {
@@ -108,28 +121,20 @@ public class FastCSVRecordReader extends
AbstractCSVRecordReader {
rawValue = trim(rawValue);
}
- final String rawFieldName;
- final DataType dataType;
if (i >= numFieldNames) {
if (!dropUnknownFields) {
values.put("unknown_field_index_" + i, rawValue);
}
continue;
- } else {
- final RecordField recordField = recordFields.get(i);
- rawFieldName = recordField.getFieldName();
- dataType = recordField.getDataType();
}
- final Object value;
- if (coerceTypes) {
- value = convert(rawValue, dataType, rawFieldName);
- } else {
- // The CSV Reader is going to return all fields as
Strings, because CSV doesn't have any way to
- // dictate a field type. As a result, we will use the
schema that we have to attempt to convert
- // the value into the desired type if it's a simple type.
- value = convertSimpleIfPossible(rawValue, dataType,
rawFieldName);
- }
+ final RecordField recordField = recordFields.get(i);
+ final String rawFieldName = recordField.getFieldName();
+ final DataType dataType = recordField.getDataType();
+
+ final Object value = coerceTypes
+ ? convert(rawValue, dataType, rawFieldName)
+ : convertSimpleIfPossible(rawValue, dataType,
rawFieldName);
values.putIfAbsent(rawFieldName, value);
}
@@ -140,10 +145,9 @@ public class FastCSVRecordReader extends
AbstractCSVRecordReader {
}
}
-
private List<RecordField> getRecordFields() {
- if (this.recordFields != null) {
- return this.recordFields;
+ if (recordFields != null) {
+ return recordFields;
}
if (ignoreHeader) {
@@ -152,39 +156,33 @@ public class FastCSVRecordReader extends
AbstractCSVRecordReader {
+ "have the same number of fields, as this is not
conformant to RFC-4180");
}
- // When getting the field names from the first record, it has to be
read in
- if (!csvRowIterator.hasNext()) {
+ if (!csvRecordIterator.hasNext()) {
return Collections.emptyList();
}
- CsvRow headerRow = csvRowIterator.next();
+
+ // read header row
+ CsvRecord headerRecord = csvRecordIterator.next();
headerMap = new HashMap<>();
- for (int i = 0; i < headerRow.getFieldCount(); i++) {
- String rawValue = headerRow.getField(i);
+ for (int i = 0; i < headerRecord.getFieldCount(); i++) {
+ String rawValue = headerRecord.getField(i);
if (csvFormat.getTrim()) {
rawValue = rawValue.trim();
}
- if (this.trimDoubleQuote) {
+ if (trimDoubleQuote) {
rawValue = trim(rawValue);
}
headerMap.put(rawValue, i);
}
-
- // Use a SortedMap keyed by index of the field so that we can get a
List of field names in the correct order
- final SortedMap<Integer, String> sortedMap = new TreeMap<>();
- for (final Map.Entry<String, Integer> entry : headerMap.entrySet()) {
+ SortedMap<Integer, String> sortedMap = new TreeMap<>();
+ for (Map.Entry<String, Integer> entry : headerMap.entrySet()) {
sortedMap.put(entry.getValue(), entry.getKey());
}
- final List<RecordField> fields = new ArrayList<>();
- final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
- for (final String rawFieldName : rawFieldNames) {
- final Optional<RecordField> option = schema.getField(rawFieldName);
- if (option.isPresent()) {
- fields.add(option.get());
- } else {
- fields.add(new RecordField(rawFieldName,
RecordFieldType.STRING.getDataType()));
- }
+ List<RecordField> fields = new ArrayList<>();
+ for (String rawFieldName : new ArrayList<>(sortedMap.values())) {
+ Optional<RecordField> optField =
getSchema().getField(rawFieldName);
+ fields.add(optField.orElseGet(() -> new RecordField(rawFieldName,
RecordFieldType.STRING.getDataType())));
}
this.recordFields = fields;
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
index 8722aa20b7..28cadd4d8d 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
@@ -19,14 +19,7 @@ package org.apache.nifi.csv;
import de.siegmar.fastcsv.writer.CsvWriter;
import de.siegmar.fastcsv.writer.LineDelimiter;
-import de.siegmar.fastcsv.writer.QuoteStrategy;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import de.siegmar.fastcsv.writer.QuoteStrategies;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;
import org.apache.nifi.schema.access.SchemaAccessWriter;
@@ -39,6 +32,14 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
import static org.apache.commons.csv.QuoteMode.MINIMAL;
public class WriteFastCSVResult extends AbstractRecordSetWriter implements
RecordSetWriter, RawRecordWriter {
@@ -48,18 +49,23 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
private final String timeFormat;
private final String timestampFormat;
- CsvWriter csvWriter;
-
- //Need to call flush() on the underlying writer
- final OutputStreamWriter streamWriter;
+ private final CsvWriter csvWriter;
+ private final OutputStreamWriter streamWriter;
private final String[] fieldValues;
private final boolean includeHeaderLine;
private boolean headerWritten = false;
private String[] fieldNames;
- public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
- final String dateFormat, final String
timeFormat, final String timestampFormat, final boolean includeHeaderLine,
final String charSet) throws IOException {
+ public WriteFastCSVResult(final CSVFormat csvFormat,
+ final RecordSchema recordSchema,
+ final SchemaAccessWriter schemaWriter,
+ final OutputStream out,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final boolean includeHeaderLine,
+ final String charSet) throws IOException {
super(out);
this.recordSchema = recordSchema;
@@ -69,7 +75,8 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
this.timestampFormat = timestampFormat;
this.includeHeaderLine = includeHeaderLine;
- streamWriter = new OutputStreamWriter(out, charSet);
+ this.streamWriter = new OutputStreamWriter(out, charSet);
+
CsvWriter.CsvWriterBuilder builder = CsvWriter.builder()
.fieldSeparator(csvFormat.getDelimiterString().charAt(0))
.quoteCharacter(csvFormat.getQuoteCharacter());
@@ -77,14 +84,15 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL :
csvFormat.getQuoteMode();
switch (quoteMode) {
case ALL:
- builder.quoteStrategy(QuoteStrategy.ALWAYS);
+ builder.quoteStrategy(QuoteStrategies.ALWAYS);
break;
+ case ALL_NON_NULL:
case NON_NUMERIC:
- builder.quoteStrategy(QuoteStrategy.NON_EMPTY);
+ builder.quoteStrategy(QuoteStrategies.NON_EMPTY);
+ break;
+ default:
+ // MINIMAL or NONE → FastCSV's default (required)
break;
- case MINIMAL:
- case NONE:
- builder.quoteStrategy(QuoteStrategy.REQUIRED);
}
try {
@@ -95,22 +103,21 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
}
if (csvFormat.getEscapeCharacter() != null &&
csvFormat.getEscapeCharacter() != '\"') {
- throw new IOException("Escape character must be a double-quote
character (\") per the FastCSV conformance to the RFC4180 specification");
+ throw new IOException("Escape character must be a double-quote
character (\") per RFC-4180");
}
- csvWriter = builder.build(streamWriter);
- fieldValues = new String[recordSchema.getFieldCount()];
+ this.csvWriter = builder.build(streamWriter);
+ this.fieldValues = new String[recordSchema.getFieldCount()];
}
private String getFormat(final RecordField field) {
final DataType dataType = field.getDataType();
return switch (dataType.getFieldType()) {
- case DATE -> dateFormat;
- case TIME -> timeFormat;
+ case DATE -> dateFormat;
+ case TIME -> timeFormat;
case TIMESTAMP -> timestampFormat;
- default -> dataType.getFormat();
+ default -> dataType.getFormat();
};
-
}
@Override
@@ -120,7 +127,6 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
@Override
protected Map<String, String> onFinishRecordSet() throws IOException {
- // If the header has not yet been written (but should be), write it
out now
includeHeaderIfNecessary(null, true);
return schemaWriter.getAttributes(recordSchema);
}
@@ -139,83 +145,63 @@ public class WriteFastCSVResult extends
AbstractRecordSetWriter implements Recor
if (fieldNames != null) {
return fieldNames;
}
-
- final Set<String> allFields = new LinkedHashSet<>();
- // The fields defined in the schema should be written first followed
by extra ones.
+ Set<String> allFields = new LinkedHashSet<>();
allFields.addAll(recordSchema.getFieldNames());
allFields.addAll(record.getRawFieldNames());
fieldNames = allFields.toArray(new String[0]);
return fieldNames;
}
- private void includeHeaderIfNecessary(final Record record, final boolean
includeOnlySchemaFields) throws IOException {
+ private void includeHeaderIfNecessary(final Record record, final boolean
schemaOnly) throws IOException {
if (headerWritten || !includeHeaderLine) {
return;
}
+ String[] names = schemaOnly
+ ? recordSchema.getFieldNames().toArray(new String[0])
+ : getFieldNames(record);
- final String[] fieldNames;
- if (includeOnlySchemaFields) {
- fieldNames = recordSchema.getFieldNames().toArray(new String[0]);
- } else {
- fieldNames = getFieldNames(record);
- }
-
- csvWriter.writeRow(fieldNames);
+ csvWriter.writeRecord(names);
headerWritten = true;
}
@Override
public Map<String, String> writeRecord(final Record record) throws
IOException {
- // If we are not writing an active record set, then we need to ensure
that we write the
- // schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
-
includeHeaderIfNecessary(record, true);
int i = 0;
- for (final RecordField recordField : recordSchema.getFields()) {
- String fieldValue = getFieldValue(record, recordField);
- fieldValues[i++] = fieldValue;
+ for (RecordField field : recordSchema.getFields()) {
+ fieldValues[i++] = record.getAsString(field, getFormat(field));
}
- csvWriter.writeRow(fieldValues);
+ csvWriter.writeRecord(fieldValues);
return schemaWriter.getAttributes(recordSchema);
}
- private String getFieldValue(final Record record, final RecordField
recordField) {
- return record.getAsString(recordField, getFormat(recordField));
- }
-
@Override
public WriteResult writeRawRecord(final Record record) throws IOException {
- // If we are not writing an active record set, then we need to ensure
that we write the
- // schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
-
includeHeaderIfNecessary(record, false);
- final String[] fieldNames = getFieldNames(record);
- // Avoid creating a new Object[] for every Record if we can. But if
the record has a different number of columns than does our
- // schema, we don't have a lot of options here, so we just create a
new Object[] in that case.
- final String[] recordFieldValues = (fieldNames.length ==
this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+ String[] names = getFieldNames(record);
+ String[] values = (names.length == fieldValues.length)
+ ? fieldValues
+ : new String[names.length];
int i = 0;
- for (final String fieldName : fieldNames) {
- final Optional<RecordField> recordField =
recordSchema.getField(fieldName);
- if (recordField.isPresent()) {
- recordFieldValues[i++] = record.getAsString(fieldName,
getFormat(recordField.get()));
- } else {
- recordFieldValues[i++] = record.getAsString(fieldName);
- }
+ for (String name : names) {
+ Optional<RecordField> rf = recordSchema.getField(name);
+ values[i++] = rf
+ .map(f -> record.getAsString(f, getFormat(f)))
+ .orElseGet(() -> record.getAsString(name));
}
- csvWriter.writeRow(recordFieldValues);
- final Map<String, String> attributes =
schemaWriter.getAttributes(recordSchema);
- return WriteResult.of(incrementRecordCount(), attributes);
+ csvWriter.writeRecord(values);
+ return WriteResult.of(incrementRecordCount(),
schemaWriter.getAttributes(recordSchema));
}
@Override