This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 3f5661232e NIFI-12024: Add CSV Writer property to CSVRecordSetWriter
and a FastCSV writer implementation
3f5661232e is described below
commit 3f5661232e2e229e97e3ae32139e379de6c755eb
Author: Matt Burgess <[email protected]>
AuthorDate: Tue Sep 12 22:09:49 2023 -0400
NIFI-12024: Add CSV Writer property to CSVRecordSetWriter and a FastCSV
writer implementation
Signed-off-by: Pierre Villard <[email protected]>
This closes #7686.
---
.../org/apache/nifi/csv/CSVRecordSetWriter.java | 38 +-
.../org/apache/nifi/csv/WriteFastCSVResult.java | 229 +++++++++++++
.../apache/nifi/csv/TestWriteFastCSVResult.java | 381 +++++++++++++++++++++
3 files changed, 646 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index ca934a7c9c..7f7b5f26ee 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -21,8 +21,10 @@ import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -43,9 +45,31 @@ import java.util.Map;
+ "corresponding to the record fields.")
public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements
RecordSetWriterFactory {
+ // CSV writer implementations
+ public static final AllowableValue APACHE_COMMONS_CSV = new
AllowableValue("commons-csv", "Apache Commons CSV",
+ "The CSV writer implementation from the Apache Commons CSV
library.");
+
+ public static final AllowableValue FAST_CSV = new
AllowableValue("fast-csv", "FastCSV",
+ "The CSV writer implementation from the FastCSV library. NOTE:
This writer only officially supports RFC-4180, so it recommended to "
+ + "set the 'CSV Format' property to 'RFC 4180'. It does
handle some non-compliant CSV data, for that case set the 'CSV Format' property
to "
+ + "'CUSTOM' and the other custom format properties (such
as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this
"
+ + "may cause errors if FastCSV doesn't handle the property
settings correctly (such as 'Quote Mode'), but otherwise may process the output
as expected even "
+ + "if the data is not fully RFC-4180 compliant.");
+ public static final PropertyDescriptor CSV_WRITER = new
PropertyDescriptor.Builder()
+ .name("csv-writer")
+ .displayName("CSV Writer")
+ .description("Specifies which writer implementation to use to
write CSV records. NOTE: Different writers may support different subsets of
functionality "
+ + "and may also exhibit different levels of performance.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(APACHE_COMMONS_CSV, FAST_CSV)
+ .defaultValue(APACHE_COMMONS_CSV.getValue())
+ .required(true)
+ .build();
+
private volatile ConfigurationContext context;
private volatile boolean includeHeader;
+ private volatile String csvWriter;
private volatile String charSet;
// it will be initialized only if there are no dynamic csv formatting
properties
@@ -55,6 +79,7 @@ public class CSVRecordSetWriter extends
DateTimeTextRecordSetWriter implements R
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(CSVUtils.CSV_FORMAT);
+ properties.add(CSV_WRITER);
properties.add(CSVUtils.VALUE_SEPARATOR);
properties.add(CSVUtils.INCLUDE_HEADER_LINE);
properties.add(CSVUtils.QUOTE_CHAR);
@@ -81,6 +106,8 @@ public class CSVRecordSetWriter extends
DateTimeTextRecordSetWriter implements R
} else {
this.csvFormat = null;
}
+
+ this.csvWriter = context.getProperty(CSV_WRITER).getValue();
}
@Override
@@ -92,7 +119,14 @@ public class CSVRecordSetWriter extends
DateTimeTextRecordSetWriter implements R
csvFormat = CSVUtils.createCSVFormat(context, variables);
}
- return new WriteCSVResult(csvFormat, schema,
getSchemaAccessWriter(schema, variables), out,
- getDateFormat().orElse(null), getTimeFormat().orElse(null),
getTimestampFormat().orElse(null), includeHeader, charSet);
+ if (APACHE_COMMONS_CSV.getValue().equals(csvWriter)) {
+ return new WriteCSVResult(csvFormat, schema,
getSchemaAccessWriter(schema, variables), out,
+ getDateFormat().orElse(null),
getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader,
charSet);
+ } else if (FAST_CSV.getValue().equals(csvWriter)) {
+ return new WriteFastCSVResult(csvFormat, schema,
getSchemaAccessWriter(schema, variables), out,
+ getDateFormat().orElse(null),
getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader,
charSet);
+ } else {
+ throw new IOException("Parser not supported");
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
new file mode 100644
index 0000000000..22a8926ad7
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteFastCSVResult.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import de.siegmar.fastcsv.writer.CsvWriter;
+import de.siegmar.fastcsv.writer.LineDelimiter;
+import de.siegmar.fastcsv.writer.QuoteStrategy;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RawRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.commons.csv.QuoteMode.MINIMAL;
+
+public class WriteFastCSVResult extends AbstractRecordSetWriter implements
RecordSetWriter, RawRecordWriter {
+ private final RecordSchema recordSchema;
+ private final SchemaAccessWriter schemaWriter;
+ private final String dateFormat;
+ private final String timeFormat;
+ private final String timestampFormat;
+
+ CsvWriter csvWriter;
+
+ //Need to call flush() on the underlying writer
+ final OutputStreamWriter streamWriter;
+
+ private final String[] fieldValues;
+ private final boolean includeHeaderLine;
+ private boolean headerWritten = false;
+ private String[] fieldNames;
+
+ public WriteFastCSVResult(final CSVFormat csvFormat, final RecordSchema
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
+ final String dateFormat, final String
timeFormat, final String timestampFormat, final boolean includeHeaderLine,
final String charSet) throws IOException {
+
+ super(out);
+ this.recordSchema = recordSchema;
+ this.schemaWriter = schemaWriter;
+ this.dateFormat = dateFormat;
+ this.timeFormat = timeFormat;
+ this.timestampFormat = timestampFormat;
+ this.includeHeaderLine = includeHeaderLine;
+
+ streamWriter = new OutputStreamWriter(out, charSet);
+ CsvWriter.CsvWriterBuilder builder = CsvWriter.builder()
+ .fieldSeparator(csvFormat.getDelimiter())
+ .quoteCharacter(csvFormat.getQuoteCharacter());
+
+ QuoteMode quoteMode = (csvFormat.getQuoteMode() == null) ? MINIMAL :
csvFormat.getQuoteMode();
+ switch (quoteMode) {
+ case ALL:
+ builder.quoteStrategy(QuoteStrategy.ALWAYS);
+ break;
+ case NON_NUMERIC:
+ builder.quoteStrategy(QuoteStrategy.NON_EMPTY);
+ break;
+ case MINIMAL:
+ case NONE:
+ builder.quoteStrategy(QuoteStrategy.REQUIRED);
+ }
+
+ try {
+ LineDelimiter lineDelimiter =
LineDelimiter.of(csvFormat.getRecordSeparator());
+ builder.lineDelimiter(lineDelimiter);
+ } catch (IllegalArgumentException iae) {
+ throw new IOException("Line delimiter is not supported, must use
LF, CR, or CRLF", iae);
+ }
+
+ if (csvFormat.getEscapeCharacter() != null &&
csvFormat.getEscapeCharacter() != '\"') {
+ throw new IOException("Escape character must be a double-quote
character (\") per the FastCSV conformance to the RFC4180 specification");
+ }
+
+ csvWriter = builder.build(streamWriter);
+ fieldValues = new String[recordSchema.getFieldCount()];
+ }
+
+ private String getFormat(final RecordField field) {
+ final DataType dataType = field.getDataType();
+ switch (dataType.getFieldType()) {
+ case DATE:
+ return dateFormat;
+ case TIME:
+ return timeFormat;
+ case TIMESTAMP:
+ return timestampFormat;
+ }
+
+ return dataType.getFormat();
+ }
+
+ @Override
+ protected void onBeginRecordSet() throws IOException {
+ schemaWriter.writeHeader(recordSchema, getOutputStream());
+ }
+
+ @Override
+ protected Map<String, String> onFinishRecordSet() throws IOException {
+ // If the header has not yet been written (but should be), write it
out now
+ includeHeaderIfNecessary(null, true);
+ return schemaWriter.getAttributes(recordSchema);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ streamWriter.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ csvWriter.close();
+ }
+
+ private String[] getFieldNames(final Record record) {
+ if (fieldNames != null) {
+ return fieldNames;
+ }
+
+ final Set<String> allFields = new LinkedHashSet<>();
+ // The fields defined in the schema should be written first followed
by extra ones.
+ allFields.addAll(recordSchema.getFieldNames());
+ allFields.addAll(record.getRawFieldNames());
+ fieldNames = allFields.toArray(new String[0]);
+ return fieldNames;
+ }
+
+ private void includeHeaderIfNecessary(final Record record, final boolean
includeOnlySchemaFields) throws IOException {
+ if (headerWritten || !includeHeaderLine) {
+ return;
+ }
+
+ final String[] fieldNames;
+ if (includeOnlySchemaFields) {
+ fieldNames = recordSchema.getFieldNames().toArray(new String[0]);
+ } else {
+ fieldNames = getFieldNames(record);
+ }
+
+ csvWriter.writeRow(fieldNames);
+ headerWritten = true;
+ }
+
+ @Override
+ public Map<String, String> writeRecord(final Record record) throws
IOException {
+ // If we are not writing an active record set, then we need to ensure
that we write the
+ // schema information.
+ if (!isActiveRecordSet()) {
+ schemaWriter.writeHeader(recordSchema, getOutputStream());
+ }
+
+ includeHeaderIfNecessary(record, true);
+
+ int i = 0;
+ for (final RecordField recordField : recordSchema.getFields()) {
+ String fieldValue = getFieldValue(record, recordField);
+ fieldValues[i++] = fieldValue;
+ }
+
+ csvWriter.writeRow(fieldValues);
+ return schemaWriter.getAttributes(recordSchema);
+ }
+
+ private String getFieldValue(final Record record, final RecordField
recordField) {
+ return record.getAsString(recordField, getFormat(recordField));
+ }
+
+ @Override
+ public WriteResult writeRawRecord(final Record record) throws IOException {
+ // If we are not writing an active record set, then we need to ensure
that we write the
+ // schema information.
+ if (!isActiveRecordSet()) {
+ schemaWriter.writeHeader(recordSchema, getOutputStream());
+ }
+
+ includeHeaderIfNecessary(record, false);
+
+ final String[] fieldNames = getFieldNames(record);
+ // Avoid creating a new Object[] for every Record if we can. But if
the record has a different number of columns than does our
+ // schema, we don't have a lot of options here, so we just create a
new Object[] in that case.
+ final String[] recordFieldValues = (fieldNames.length ==
this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+
+ int i = 0;
+ for (final String fieldName : fieldNames) {
+ final Optional<RecordField> recordField =
recordSchema.getField(fieldName);
+ if (recordField.isPresent()) {
+ recordFieldValues[i++] = record.getAsString(fieldName,
getFormat(recordField.get()));
+ } else {
+ recordFieldValues[i++] = record.getAsString(fieldName);
+ }
+ }
+
+ csvWriter.writeRow(recordFieldValues);
+ final Map<String, String> attributes =
schemaWriter.getAttributes(recordSchema);
+ return WriteResult.of(incrementRecordCount(), attributes);
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/csv";
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
new file mode 100644
index 0000000000..ebe29351fa
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteFastCSVResult.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.apache.nifi.schema.access.SchemaNameAsAttribute;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestWriteFastCSVResult {
+
+ @Test
+ public void testDataTypes() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.RFC4180.withQuoteMode(QuoteMode.NON_NUMERIC).withRecordSeparator("\n");
+
+ final StringBuilder headerBuilder = new StringBuilder();
+ final List<RecordField> fields = new ArrayList<>();
+ for (final RecordFieldType fieldType : RecordFieldType.values()) {
+ if (fieldType == RecordFieldType.CHOICE) {
+ final List<DataType> possibleTypes = new ArrayList<>();
+ possibleTypes.add(RecordFieldType.INT.getDataType());
+ possibleTypes.add(RecordFieldType.LONG.getDataType());
+
+ fields.add(new RecordField(fieldType.name().toLowerCase(),
fieldType.getChoiceDataType(possibleTypes)));
+ } else {
+ fields.add(new RecordField(fieldType.name().toLowerCase(),
fieldType.getDataType()));
+ }
+
+
headerBuilder.append('"').append(fieldType.name().toLowerCase()).append('"').append(",");
+ }
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final long now = System.currentTimeMillis();
+
+ try (final WriteFastCSVResult result = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "UTF-8")) {
+
+ final Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put("string", "a孟bc李12儒3");
+ valueMap.put("boolean", true);
+ valueMap.put("byte", (byte) 1);
+ valueMap.put("char", 'c');
+ valueMap.put("short", (short) 8);
+ valueMap.put("int", 9);
+ valueMap.put("bigint", BigInteger.valueOf(8L));
+ valueMap.put("long", 8L);
+ valueMap.put("float", 8.0F);
+ valueMap.put("double", 8.0D);
+ valueMap.put("decimal", BigDecimal.valueOf(8.1D));
+ valueMap.put("date", new Date(now));
+ valueMap.put("time", new Time(now));
+ valueMap.put("timestamp", new Timestamp(now));
+ valueMap.put("record", null);
+ valueMap.put("choice", 48L);
+ valueMap.put("array", null);
+ valueMap.put("enum", null);
+ valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c");
+
+ final Record record = new MapRecord(schema, valueMap);
+ final RecordSet rs = RecordSet.of(schema, record);
+
+ result.write(rs);
+ }
+
+ final String output = new String(baos.toByteArray(),
StandardCharsets.UTF_8);
+
+ headerBuilder.deleteCharAt(headerBuilder.length() - 1);
+ final String headerLine = headerBuilder.toString();
+
+ final String[] splits = output.split("\n");
+ assertEquals(2, splits.length);
+ assertEquals(headerLine, splits[0]);
+
+ final String dateValue =
getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
+ final String timeValue =
getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
+ final String timestampValue =
getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
+
+ final String values = splits[1];
+ final StringBuilder expectedBuilder = new StringBuilder();
+
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\""
+ timestampValue +
+ "\",\"" + dateValue + "\",\"" + timeValue +
"\",\"8bb20bf2-ec41-4b94-80a4-922f4dba009c\",\"c\",,\"a孟bc李12儒3\",,\"48\",,");
+
+ final String expectedValues = expectedBuilder.toString();
+
+ assertEquals(expectedValues, values);
+ }
+
+ @Test
+ public void testExtraFieldInWriteRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("id", "1");
+ values.put("name", "John");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.write(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id\n1\n", output);
+ }
+
+ @Test
+ public void testExtraFieldInWriteRawRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ // The fields defined in the schema should be written first followed
by extra ones.
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("name", "John");
+ values.put("id", "1");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.writeRawRecord(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n1,John\n", output);
+ }
+
+ @Test
+ public void testMissingFieldWriteRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.writeRecord(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n1,\n", output);
+ }
+
+ @Test
+ public void testMissingFieldWriteRawRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.writeRawRecord(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n1,\n", output);
+ }
+
+
+ @Test
+ public void testMissingAndExtraFieldWriteRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1");
+ values.put("dob", "1/1/1970");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.writeRecord(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n1,\n", output);
+ }
+
+ @Test
+ public void testMissingAndExtraFieldWriteRawRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuoteMode(QuoteMode.NONE).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1");
+ values.put("dob", "1/1/1970");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.writeRawRecord(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name,dob\n1,,1/1/1970\n", output);
+ }
+
+ @Test
+ public void testEscapeCharInValueWriteRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.withEscape('\"').withQuote("\"".charAt(0)).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1\"");
+ values.put("name", "John Doe");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.write(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n\"1\"\"\",John Doe\n", output);
+ }
+
+ @Test
+ public void testEmptyEscapeCharWriteRecord() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.RFC4180.withEscape(null).withQuote("\"".charAt(0)).withRecordSeparator("\n");
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1\"");
+ values.put("name", "John Doe");
+ final Record record = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.write(record);
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ // The first field is quoted as the Quote Mode defaults to MINIMAL and
it has an escaped character in it
+ assertEquals("id,name\n\"1\"\"\",John Doe\n", output);
+ }
+
+ @Test
+ public void testWriteHeaderWithNoRecords() throws IOException {
+ final CSVFormat csvFormat =
CSVFormat.DEFAULT.builder().setEscape('\"').setQuoteMode(QuoteMode.NONE).setRecordSeparator("\n").build();
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final String output;
+ try (final WriteFastCSVResult writer = new
WriteFastCSVResult(csvFormat, schema, new SchemaNameAsAttribute(), baos,
+ RecordFieldType.DATE.getDefaultFormat(),
RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), true, "ASCII")) {
+
+ writer.beginRecordSet();
+ writer.finishRecordSet();
+ writer.flush();
+ output = baos.toString();
+ }
+
+ assertEquals("id,name\n", output);
+ }
+
+
+ private DateFormat getDateFormat(final String format) {
+ final DateFormat df = new SimpleDateFormat(format);
+ return df;
+ }
+
+
+}
\ No newline at end of file