This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f5661232e NIFI-12024: Add CSV Writer property to CSVRecordSetWriter 
and a FastCSV writer implementation
3f5661232e is described below

commit 3f5661232e2e229e97e3ae32139e379de6c755eb
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Tue Sep 12 22:09:49 2023 -0400

    NIFI-12024: Add CSV Writer property to CSVRecordSetWriter and a FastCSV 
writer implementation
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #7686.
---
 .../org/apache/nifi/csv/CSVRecordSetWriter.java    |  38 +-
 .../org/apache/nifi/csv/WriteFastCSVResult.java    | 229 +++++++++++++
 .../apache/nifi/csv/TestWriteFastCSVResult.java    | 381 +++++++++++++++++++++
 3 files changed, 646 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index ca934a7c9c..7f7b5f26ee 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -21,8 +21,10 @@ import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -43,9 +45,31 @@ import java.util.Map;
     + "corresponding to the record fields.")
 public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements 
RecordSetWriterFactory {
 
+    // CSV writer implementations
+    public static final AllowableValue APACHE_COMMONS_CSV = new 
AllowableValue("commons-csv", "Apache Commons CSV",
+            "The CSV writer implementation from the Apache Commons CSV 
library.");
+
+    public static final AllowableValue FAST_CSV = new 
AllowableValue("fast-csv", "FastCSV",
+            "The CSV writer implementation from the FastCSV library. NOTE: 
This writer only officially supports RFC-4180, so it recommended to "
+                    + "set the 'CSV Format' property to 'RFC 4180'. It does 
handle some non-compliant CSV data, for that case set the 'CSV Format' property 
to "
+                    + "'CUSTOM' and the other custom format properties (such 
as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this 
"
+                    + "may cause errors if FastCSV doesn't handle the property 
settings correctly (such as 'Quote Mode'), but otherwise may process the output 
as expected even "
+                    + "if the data is not fully RFC-4180 compliant.");
+    public static final PropertyDescriptor CSV_WRITER = new 
PropertyDescriptor.Builder()
+            .name("csv-writer")
+            .displayName("CSV Writer")
+            .description("Specifies which writer implementation to use to 
write CSV records. NOTE: Different writers may support different subsets of 
functionality "
+                    + "and may also exhibit different levels of performance.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(APACHE_COMMONS_CSV, FAST_CSV)
+            .defaultValue(APACHE_COMMONS_CSV.getValue())
+            .required(true)
+            .build();
+
     private volatile ConfigurationContext context;
 
     private volatile boolean includeHeader;
+    private volatile String csvWriter;
     private volatile String charSet;
 
     // it will be initialized only if there are no dynamic csv formatting 
properties
@@ -55,6 +79,7 @@ public class CSVRecordSetWriter extends 
DateTimeTextRecordSetWriter implements R
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(CSVUtils.CSV_FORMAT);
+        properties.add(CSV_WRITER);
         properties.add(CSVUtils.VALUE_SEPARATOR);
         properties.add(CSVUtils.INCLUDE_HEADER_LINE);
         properties.add(CSVUtils.QUOTE_CHAR);
@@ -81,6 +106,8 @@ public class CSVRecordSetWriter extends 
DateTimeTextRecordSetWriter implements R
         } else {
             this.csvFormat = null;
         }
+
+        this.csvWriter = context.getProperty(CSV_WRITER).getValue();
     }
 
     @Override
@@ -92,7 +119,14 @@ public class CSVRecordSetWriter extends 
DateTimeTextRecordSetWriter implements R
             csvFormat = CSVUtils.createCSVFormat(context, variables);
         }
 
-        return new WriteCSVResult(csvFormat, schema, 
getSchemaAccessWriter(schema, variables), out,
-            getDateFormat().orElse(null), getTimeFormat().orElse(null), 
getTimestampFormat().orElse(null), includeHeader, charSet);
+        if (APACHE_COMMONS_CSV.getValue().equals(csvWriter)) {
+            return new WriteCSVResult(csvFormat, schema, 
getSchemaAccessWriter(schema, variables), out,
+                    getDateFormat().orElse(null), 
getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, 
charSet);
+        } else if (FAST_CSV.getValue().equals(csvWriter)) {
+            return new WriteFastCSVResult(csvFormat, schema, 
getSchemaAccessWriter(schema, variables), out,
+                    getDateFormat().orElse(null), 
getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, 
charSet);
+        } else {
+            throw new IOException("Parser not supported");
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
new file mode 100644
index 0000000000..22a8926ad7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import de.siegmar.fastcsv.writer.CsvWriter;
+import de.siegmar.fastcsv.writer.LineDelimiter;
+import de.siegmar.fastcsv.writer.QuoteStrategy;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RawRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.csv.QuoteMode.MINIMAL;
+
+public class WriteFastCSVResult extends AbstractRecordSetWriter implements 
RecordSetWriter, RawRecordWriter {
+    private final RecordSchema recordSchema;
+    private final SchemaAccessWriter schemaWriter;
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
+
+    CsvWriter csvWriter;
+
+    //Need to call flush() on the underlying writer
+    final OutputStreamWriter streamWriter;
+
+    private final String[] fieldValues;
+    private final boolean includeHeaderLine;
+    private boolean headerWritten = false;
+    private String[] fieldNames;
+
+    public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
+                                 final String dateFormat, final String 
timeFormat, final String timestampFormat, final boolean includeHeaderLine, 
final String charSet) throws IOException {
+
+        super(out);
+        this.recordSchema = recordSchema;
+        this.schemaWriter = schemaWriter;
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
+        this.includeHeaderLine = includeHeaderLine;
+
+        streamWriter = new OutputStreamWriter(out, charSet);
+        CsvWriter.CsvWriterBuilder builder = CsvWriter.builder()
+                .fieldSeparator(csvFormat.getDelimiter())
+                .quoteCharacter(csvFormat.getQuoteCharacter());
+
+        QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL : 
csvFormat.getQuoteMode();
+        switch (quoteMode) {
+            case ALL:
+                builder.quoteStrategy(QuoteStrategy.ALWAYS);
+                break;
+            case NON_NUMERIC:
+                builder.quoteStrategy(QuoteStrategy.NON_EMPTY);
+                break;
+            case MINIMAL:
+            case NONE:
+                builder.quoteStrategy(QuoteStrategy.REQUIRED);
+        }
+
+        try {
+            LineDelimiter lineDelimiter = 
LineDelimiter.of(csvFormat.getRecordSeparator());
+            builder.lineDelimiter(lineDelimiter);
+        } catch (IllegalArgumentException iae) {
+            throw new IOException("Line delimiter is not supported, must use 
LF, CR, or CRLF", iae);
+        }
+
+        if (csvFormat.getEscapeCharacter() != null && 
csvFormat.getEscapeCharacter() != '\"') {
+            throw new IOException("Escape character must be a double-quote 
character (\") per the FastCSV conformance to the RFC4180 specification");
+        }
+
+        csvWriter = builder.build(streamWriter);
+        fieldValues = new String[recordSchema.getFieldCount()];
+    }
+
+    private String getFormat(final RecordField field) {
+        final DataType dataType = field.getDataType();
+        switch (dataType.getFieldType()) {
+            case DATE:
+                return dateFormat;
+            case TIME:
+                return timeFormat;
+            case TIMESTAMP:
+                return timestampFormat;
+        }
+
+        return dataType.getFormat();
+    }
+
+    @Override
+    protected void onBeginRecordSet() throws IOException {
+        schemaWriter.writeHeader(recordSchema, getOutputStream());
+    }
+
+    @Override
+    protected Map<String, String> onFinishRecordSet() throws IOException {
+        // If the header has not yet been written (but should be), write it 
out now
+        includeHeaderIfNecessary(null, true);
+        return schemaWriter.getAttributes(recordSchema);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        streamWriter.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        csvWriter.close();
+    }
+
+    private String[] getFieldNames(final Record record) {
+        if (fieldNames != null) {
+            return fieldNames;
+        }
+
+        final Set<String> allFields = new LinkedHashSet<>();
+        // The fields defined in the schema should be written first followed 
by extra ones.
+        allFields.addAll(recordSchema.getFieldNames());
+        allFields.addAll(record.getRawFieldNames());
+        fieldNames = allFields.toArray(new String[0]);
+        return fieldNames;
+    }
+
+    private void includeHeaderIfNecessary(final Record record, final boolean 
includeOnlySchemaFields) throws IOException {
+        if (headerWritten || !includeHeaderLine) {
+            return;
+        }
+
+        final String[] fieldNames;
+        if (includeOnlySchemaFields) {
+            fieldNames = recordSchema.getFieldNames().toArray(new String[0]);
+        } else {
+            fieldNames = getFieldNames(record);
+        }
+
+        csvWriter.writeRow(fieldNames);
+        headerWritten = true;
+    }
+
+    @Override
+    public Map<String, String> writeRecord(final Record record) throws 
IOException {
+        // If we are not writing an active record set, then we need to ensure 
that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            schemaWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
+        includeHeaderIfNecessary(record, true);
+
+        int i = 0;
+        for (final RecordField recordField : recordSchema.getFields()) {
+            String fieldValue = getFieldValue(record, recordField);
+            fieldValues[i++] = fieldValue;
+        }
+
+        csvWriter.writeRow(fieldValues);
+        return schemaWriter.getAttributes(recordSchema);
+    }
+
+    private String getFieldValue(final Record record, final RecordField 
recordField) {
+        return record.getAsString(recordField, getFormat(recordField));
+    }
+
+    @Override
+    public WriteResult writeRawRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure 
that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            schemaWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
+        includeHeaderIfNecessary(record, false);
+
+        final String[] fieldNames = getFieldNames(record);
+        // Avoid creating a new Object[] for every Record if we can. But if 
the record has a different number of columns than does our
+        // schema, we don't have a lot of options here, so we just create a 
new Object[] in that case.
+        final String[] recordFieldValues = (fieldNames.length == 
this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+
+        int i = 0;
+        for (final String fieldName : fieldNames) {
+            final Optional<RecordField> recordField = 
recordSchema.getField(fieldName);
+            if (recordField.isPresent()) {
+                recordFieldValues[i++] = record.getAsString(fieldName, 
getFormat(recordField.get()));
+            } else {
+                recordFieldValues[i++] = record.getAsString(fieldName);
+            }
+        }
+
+        csvWriter.writeRow(recordFieldValues);
+        final Map<String, String> attributes = 
schemaWriter.getAttributes(recordSchema);
+        return WriteResult.of(incrementRecordCount(), attributes);
+    }
+
+    @Override
+    public String getMimeType() {
+        return "text/csv";
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
new file mode 100644
index 0000000000..ebe29351fa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.apache.nifi.schema.access.SchemaNameAsAttribute;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestWriteFastCSVResult {
+
+    @Test
+    public void testDataTypes() throws IOException {
+    final CSVFormat csvFormat = 
CSVFormat.RFC4180.withQuoteMode(QuoteMode.NON_NUMERIC).withRecordSeparator("\n");
+
+        final StringBuilder headerBuilder = new StringBuilder();
+        final List<RecordField> fields = new ArrayList<>();
+        for (final RecordFieldType fieldType : RecordFieldType.values()) {
+            if (fieldType == RecordFieldType.CHOICE) {
+                final List<DataType> possibleTypes = new ArrayList<>();
+                possibleTypes.add(RecordFieldType.INT.getDataType());
+                possibleTypes.add(RecordFieldType.LONG.getDataType());
+
+                fields.add(new RecordField(fieldType.name().toLowerCase(), 
fieldType.getChoiceDataType(possibleTypes)));
+            } else {
+                fields.add(new RecordField(fieldType.name().toLowerCase(), 
fieldType.getDataType()));
+            }
+
+            
headerBuilder.append('"').append(fieldType.name().toLowerCase()).append('"').append(",");
+        }
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final long now = System.currentTimeMillis();
+
+        try (final WriteFastCSVResult result = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) {
+
+            final Map<String, Object> valueMap = new HashMap<>();
+            valueMap.put("string", "a孟bc李12儒3");
+            valueMap.put("boolean", true);
+            valueMap.put("byte", (byte) 1);
+            valueMap.put("char", 'c');
+            valueMap.put("short", (short) 8);
+            valueMap.put("int", 9);
+            valueMap.put("bigint", BigInteger.valueOf(8L));
+            valueMap.put("long", 8L);
+            valueMap.put("float", 8.0F);
+            valueMap.put("double", 8.0D);
+            valueMap.put("decimal", BigDecimal.valueOf(8.1D));
+            valueMap.put("date", new Date(now));
+            valueMap.put("time", new Time(now));
+            valueMap.put("timestamp", new Timestamp(now));
+            valueMap.put("record", null);
+            valueMap.put("choice", 48L);
+            valueMap.put("array", null);
+            valueMap.put("enum", null);
+            valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c");
+
+            final Record record = new MapRecord(schema, valueMap);
+            final RecordSet rs = RecordSet.of(schema, record);
+
+            result.write(rs);
+        }
+
+        final String output = new String(baos.toByteArray(), 
StandardCharsets.UTF_8);
+
+        headerBuilder.deleteCharAt(headerBuilder.length() - 1);
+        final String headerLine = headerBuilder.toString();
+
+        final String[] splits = output.split("\n");
+        assertEquals(2, splits.length);
+        assertEquals(headerLine, splits[0]);
+
+        final String dateValue = 
getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
+        final String timeValue = 
getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
+        final String timestampValue = 
getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
+
+        final String values = splits[1];
+        final StringBuilder expectedBuilder = new StringBuilder();
+        
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\""
 + timestampValue +
+                "\",\"" + dateValue + "\",\"" + timeValue + 
"\",\"8bb20bf2-ec41-4b94-80a4-922f4dba009c\",\"c\",,\"a孟bc李12儒3\",,\"48\",,");
+
+        final String expectedValues = expectedBuilder.toString();
+
+        assertEquals(expectedValues, values);
+    }
+
+    @Test
+    public void testExtraFieldInWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("id", "1");
+        values.put("name", "John");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.write(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id\n1\n", output);
+    }
+
+    @Test
+    public void testExtraFieldInWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        // The fields defined in the schema should be written first followed 
by extra ones.
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("name", "John");
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,John\n", output);
+    }
+
+    @Test
+    public void testMissingFieldWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+    @Test
+    public void testMissingFieldWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+
+    @Test
+    public void testMissingAndExtraFieldWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.writeRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n1,\n", output);
+    }
+
+    @Test
+    public void testMissingAndExtraFieldWriteRawRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1");
+        values.put("dob", "1/1/1970");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.writeRawRecord(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name,dob\n1,,1/1/1970\n", output);
+    }
+
+    @Test
+    public void testEscapeCharInValueWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.withEscape('\"').withQuote("\"".charAt(0)).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1\"");
+        values.put("name", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.write(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n\"1\"\"\",John Doe\n", output);
+    }
+
+    @Test
+    public void testEmptyEscapeCharWriteRecord() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.RFC4180.withEscape(null).withQuote("\"".charAt(0)).withRecordSeparator("\n");
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new LinkedHashMap<>();
+        values.put("id", "1\"");
+        values.put("name", "John Doe");
+        final Record record = new MapRecord(schema, values);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.write(record);
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        // The first field is quoted as the Quote Mode defaults to MINIMAL and 
it has an escaped character in it
+        assertEquals("id,name\n\"1\"\"\",John Doe\n", output);
+    }
+
+    @Test
+    public void testWriteHeaderWithNoRecords() throws IOException {
+        final CSVFormat csvFormat = 
CSVFormat.DEFAULT.builder().setEscape('\"').setQuoteMode(QuoteMode.NONE).setRecordSeparator("\n").build();
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final String output;
+        try (final WriteFastCSVResult writer = new 
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+            writer.beginRecordSet();
+            writer.finishRecordSet();
+            writer.flush();
+            output = baos.toString();
+        }
+
+        assertEquals("id,name\n", output);
+    }
+
+
+    private DateFormat getDateFormat(final String format) {
+        final DateFormat df = new SimpleDateFormat(format);
+        return df;
+    }
+
+
+}
\ No newline at end of file

Reply via email to