This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 3f5661232e NIFI-12024: Add CSV Writer property to CSVRecordSetWriter and a FastCSV writer implementation 3f5661232e is described below commit 3f5661232e2e229e97e3ae32139e379de6c755eb Author: Matt Burgess <mattyb...@apache.org> AuthorDate: Tue Sep 12 22:09:49 2023 -0400 NIFI-12024: Add CSV Writer property to CSVRecordSetWriter and a FastCSV writer implementation Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #7686. --- .../org/apache/nifi/csv/CSVRecordSetWriter.java | 38 +- .../org/apache/nifi/csv/WriteFastCSVResult.java | 229 +++++++++++++ .../apache/nifi/csv/TestWriteFastCSVResult.java | 381 +++++++++++++++++++++ 3 files changed, 646 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index ca934a7c9c..7f7b5f26ee 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -21,8 +21,10 @@ import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; @@ -43,9 +45,31 @@ import java.util.Map; + "corresponding to the record fields.") public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { + // CSV writer implementations + public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv", "Apache Commons CSV", + "The CSV writer implementation from the Apache Commons CSV library."); + + public static final AllowableValue FAST_CSV = new AllowableValue("fast-csv", "FastCSV", + "The CSV writer implementation from the FastCSV library. NOTE: This writer only officially supports RFC-4180, so it recommended to " + + "set the 'CSV Format' property to 'RFC 4180'. It does handle some non-compliant CSV data, for that case set the 'CSV Format' property to " + + "'CUSTOM' and the other custom format properties (such as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this " + + "may cause errors if FastCSV doesn't handle the property settings correctly (such as 'Quote Mode'), but otherwise may process the output as expected even " + + "if the data is not fully RFC-4180 compliant."); + public static final PropertyDescriptor CSV_WRITER = new PropertyDescriptor.Builder() + .name("csv-writer") + .displayName("CSV Writer") + .description("Specifies which writer implementation to use to write CSV records. NOTE: Different writers may support different subsets of functionality " + + "and may also exhibit different levels of performance.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(APACHE_COMMONS_CSV, FAST_CSV) + .defaultValue(APACHE_COMMONS_CSV.getValue()) + .required(true) + .build(); + private volatile ConfigurationContext context; private volatile boolean includeHeader; + private volatile String csvWriter; private volatile String charSet; // it will be initialized only if there are no dynamic csv formatting properties @@ -55,6 +79,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(CSVUtils.CSV_FORMAT); + properties.add(CSV_WRITER); properties.add(CSVUtils.VALUE_SEPARATOR); properties.add(CSVUtils.INCLUDE_HEADER_LINE); properties.add(CSVUtils.QUOTE_CHAR); @@ -81,6 +106,8 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R } else { this.csvFormat = null; } + + this.csvWriter = context.getProperty(CSV_WRITER).getValue(); } @Override @@ -92,7 +119,14 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R csvFormat = CSVUtils.createCSVFormat(context, variables); } - return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out, - getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); + if (APACHE_COMMONS_CSV.getValue().equals(csvWriter)) { + return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out, + getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); + } else if (FAST_CSV.getValue().equals(csvWriter)) { + return new WriteFastCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out, + getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); + } else { + throw new IOException("Parser not supported"); + } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java new file mode 100644 index 0000000000..22a8926ad7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import de.siegmar.fastcsv.writer.CsvWriter; +import de.siegmar.fastcsv.writer.LineDelimiter; +import de.siegmar.fastcsv.writer.QuoteStrategy; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; +import org.apache.nifi.schema.access.SchemaAccessWriter; +import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RawRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.commons.csv.QuoteMode.MINIMAL; + +public class WriteFastCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter { + private final RecordSchema recordSchema; + private final SchemaAccessWriter schemaWriter; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + CsvWriter csvWriter; + + //Need to call flush() on the underlying writer + final OutputStreamWriter streamWriter; + + private final String[] fieldValues; + private final boolean includeHeaderLine; + private boolean headerWritten = false; + private String[] fieldNames; + + public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out, + final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine, final String charSet) throws IOException { + + super(out); + this.recordSchema = recordSchema; + this.schemaWriter = schemaWriter; + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + this.includeHeaderLine = includeHeaderLine; + + streamWriter = new OutputStreamWriter(out, charSet); + CsvWriter.CsvWriterBuilder builder = CsvWriter.builder() + .fieldSeparator(csvFormat.getDelimiter()) + .quoteCharacter(csvFormat.getQuoteCharacter()); + + QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL : csvFormat.getQuoteMode(); + switch (quoteMode) { + case ALL: + builder.quoteStrategy(QuoteStrategy.ALWAYS); + break; + case NON_NUMERIC: + builder.quoteStrategy(QuoteStrategy.NON_EMPTY); + break; + case MINIMAL: + case NONE: + builder.quoteStrategy(QuoteStrategy.REQUIRED); + } + + try { + LineDelimiter lineDelimiter = LineDelimiter.of(csvFormat.getRecordSeparator()); + builder.lineDelimiter(lineDelimiter); + } catch (IllegalArgumentException iae) { + throw new IOException("Line delimiter is not supported, must use LF, CR, or CRLF", iae); + } + + if (csvFormat.getEscapeCharacter() != null && csvFormat.getEscapeCharacter() != '\"') { + throw new IOException("Escape character must be a double-quote character (\") per the FastCSV conformance to the RFC4180 specification"); + } + + csvWriter = builder.build(streamWriter); + fieldValues = new String[recordSchema.getFieldCount()]; + } + + private String getFormat(final RecordField field) { + final DataType dataType = field.getDataType(); + switch (dataType.getFieldType()) { + case DATE: + return dateFormat; + case TIME: + return timeFormat; + case TIMESTAMP: + return timestampFormat; + } + + return dataType.getFormat(); + } + + @Override + protected void onBeginRecordSet() throws IOException { + schemaWriter.writeHeader(recordSchema, getOutputStream()); + } + + @Override + protected Map<String, String> onFinishRecordSet() throws IOException { + // If the header has not yet been written (but should be), write it out now + includeHeaderIfNecessary(null, true); + return schemaWriter.getAttributes(recordSchema); + } + + @Override + public void flush() throws IOException { + streamWriter.flush(); + } + + @Override + public void close() throws IOException { + csvWriter.close(); + } + + private String[] getFieldNames(final Record record) { + if (fieldNames != null) { + return fieldNames; + } + + final Set<String> allFields = new LinkedHashSet<>(); + // The fields defined in the schema should be written first followed by extra ones. + allFields.addAll(recordSchema.getFieldNames()); + allFields.addAll(record.getRawFieldNames()); + fieldNames = allFields.toArray(new String[0]); + return fieldNames; + } + + private void includeHeaderIfNecessary(final Record record, final boolean includeOnlySchemaFields) throws IOException { + if (headerWritten || !includeHeaderLine) { + return; + } + + final String[] fieldNames; + if (includeOnlySchemaFields) { + fieldNames = recordSchema.getFieldNames().toArray(new String[0]); + } else { + fieldNames = getFieldNames(record); + } + + csvWriter.writeRow(fieldNames); + headerWritten = true; + } + + @Override + public Map<String, String> writeRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + schemaWriter.writeHeader(recordSchema, getOutputStream()); + } + + includeHeaderIfNecessary(record, true); + + int i = 0; + for (final RecordField recordField : recordSchema.getFields()) { + String fieldValue = getFieldValue(record, recordField); + fieldValues[i++] = fieldValue; + } + + csvWriter.writeRow(fieldValues); + return schemaWriter.getAttributes(recordSchema); + } + + private String getFieldValue(final Record record, final RecordField recordField) { + return record.getAsString(recordField, getFormat(recordField)); + } + + @Override + public WriteResult writeRawRecord(final Record record) throws IOException { + // If we are not writing an active record set, then we need to ensure that we write the + // schema information. + if (!isActiveRecordSet()) { + schemaWriter.writeHeader(recordSchema, getOutputStream()); + } + + includeHeaderIfNecessary(record, false); + + final String[] fieldNames = getFieldNames(record); + // Avoid creating a new Object[] for every Record if we can. But if the record has a different number of columns than does our + // schema, we don't have a lot of options here, so we just create a new Object[] in that case. + final String[] recordFieldValues = (fieldNames.length == this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length]; + + int i = 0; + for (final String fieldName : fieldNames) { + final Optional<RecordField> recordField = recordSchema.getField(fieldName); + if (recordField.isPresent()) { + recordFieldValues[i++] = record.getAsString(fieldName, getFormat(recordField.get())); + } else { + recordFieldValues[i++] = record.getAsString(fieldName); + } + } + + csvWriter.writeRow(recordFieldValues); + final Map<String, String> attributes = schemaWriter.getAttributes(recordSchema); + return WriteResult.of(incrementRecordCount(), attributes); + } + + @Override + public String getMimeType() { + return "text/csv"; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java new file mode 100644 index 0000000000..ebe29351fa --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; +import org.apache.nifi.schema.access.SchemaNameAsAttribute; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestWriteFastCSVResult { + + @Test + public void testDataTypes() throws IOException { + final CSVFormat csvFormat = CSVFormat.RFC4180.withQuoteMode(QuoteMode.NON_NUMERIC).withRecordSeparator("\n"); + + final StringBuilder headerBuilder = new StringBuilder(); + final List<RecordField> fields = new ArrayList<>(); + for (final RecordFieldType fieldType : RecordFieldType.values()) { + if (fieldType == RecordFieldType.CHOICE) { + final List<DataType> possibleTypes = new ArrayList<>(); + possibleTypes.add(RecordFieldType.INT.getDataType()); + possibleTypes.add(RecordFieldType.LONG.getDataType()); + + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes))); + } else { + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType())); + } + + headerBuilder.append('"').append(fieldType.name().toLowerCase()).append('"').append(","); + } + final RecordSchema schema = new SimpleRecordSchema(fields); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final long now = System.currentTimeMillis(); + + try (final WriteFastCSVResult result = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) { + + final Map<String, Object> valueMap = new HashMap<>(); + valueMap.put("string", "a孟bc李12儒3"); + valueMap.put("boolean", true); + valueMap.put("byte", (byte) 1); + valueMap.put("char", 'c'); + valueMap.put("short", (short) 8); + valueMap.put("int", 9); + valueMap.put("bigint", BigInteger.valueOf(8L)); + valueMap.put("long", 8L); + valueMap.put("float", 8.0F); + valueMap.put("double", 8.0D); + valueMap.put("decimal", BigDecimal.valueOf(8.1D)); + valueMap.put("date", new Date(now)); + valueMap.put("time", new Time(now)); + valueMap.put("timestamp", new Timestamp(now)); + valueMap.put("record", null); + valueMap.put("choice", 48L); + valueMap.put("array", null); + valueMap.put("enum", null); + valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c"); + + final Record record = new MapRecord(schema, valueMap); + final RecordSet rs = RecordSet.of(schema, record); + + result.write(rs); + } + + final String output = new String(baos.toByteArray(), StandardCharsets.UTF_8); + + headerBuilder.deleteCharAt(headerBuilder.length() - 1); + final String headerLine = headerBuilder.toString(); + + final String[] splits = output.split("\n"); + assertEquals(2, splits.length); + assertEquals(headerLine, splits[0]); + + final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); + final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); + final String timestampValue = getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now); + + final String values = splits[1]; + final StringBuilder expectedBuilder = new StringBuilder(); + expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"8bb20bf2-ec41-4b94-80a4-922f4dba009c\",\"c\",,\"a孟bc李12儒3\",,\"48\",,"); + + final String expectedValues = expectedBuilder.toString(); + + assertEquals(expectedValues, values); + } + + @Test + public void testExtraFieldInWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("id", "1"); + values.put("name", "John"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.write(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id\n1\n", output); + } + + @Test + public void testExtraFieldInWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + // The fields defined in the schema should be written first followed by extra ones. + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("name", "John"); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,John\n", output); + } + + @Test + public void testMissingFieldWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + @Test + public void testMissingFieldWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + + @Test + public void testMissingAndExtraFieldWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.writeRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n1,\n", output); + } + + @Test + public void testMissingAndExtraFieldWriteRawRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1"); + values.put("dob", "1/1/1970"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.writeRawRecord(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name,dob\n1,,1/1/1970\n", output); + } + + @Test + public void testEscapeCharInValueWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.withEscape('\"').withQuote("\"".charAt(0)).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1\""); + values.put("name", "John Doe"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.write(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n\"1\"\"\",John Doe\n", output); + } + + @Test + public void testEmptyEscapeCharWriteRecord() throws IOException { + final CSVFormat csvFormat = CSVFormat.RFC4180.withEscape(null).withQuote("\"".charAt(0)).withRecordSeparator("\n"); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new LinkedHashMap<>(); + values.put("id", "1\""); + values.put("name", "John Doe"); + final Record record = new MapRecord(schema, values); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.write(record); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + // The first field is quoted as the Quote Mode defaults to MINIMAL and it has an escaped character in it + assertEquals("id,name\n\"1\"\"\",John Doe\n", output); + } + + @Test + public void testWriteHeaderWithNoRecords() throws IOException { + final CSVFormat csvFormat = CSVFormat.DEFAULT.builder().setEscape('\"').setQuoteMode(QuoteMode.NONE).setRecordSeparator("\n").build(); + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String output; + try (final WriteFastCSVResult writer = new WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) { + + writer.beginRecordSet(); + writer.finishRecordSet(); + writer.flush(); + output = baos.toString(); + } + + assertEquals("id,name\n", output); + } + + + private DateFormat getDateFormat(final String format) { + final DateFormat df = new SimpleDateFormat(format); + return df; + } + + +} \ No newline at end of file