[
https://issues.apache.org/jira/browse/NIFI-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298633#comment-16298633
]
ASF GitHub Bot commented on NIFI-4496:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2245#discussion_r158051319
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
---
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.io.input.BOMInputStream;
+import org.apache.commons.lang3.CharUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+
+public class JacksonCSVRecordReader implements RecordReader {
+ private final RecordSchema schema;
+
+ private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+ private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+ private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+
+ private final ComponentLog logger;
+ private final boolean hasHeader;
+ private final boolean ignoreHeader;
+ private final MappingIterator<String[]> recordStream;
+ private List<String> rawFieldNames = null;
+
+ private volatile static CsvMapper mapper = new
CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+ public JacksonCSVRecordReader(final InputStream in, final ComponentLog
logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean
hasHeader, final boolean ignoreHeader,
+ final String dateFormat, final String
timeFormat, final String timestampFormat, final String encoding) throws
IOException {
+
+ this.schema = schema;
+ this.logger = logger;
+ this.hasHeader = hasHeader;
+ this.ignoreHeader = ignoreHeader;
+ final DateFormat df = dateFormat == null ? null :
DataTypeUtils.getDateFormat(dateFormat);
+ final DateFormat tf = timeFormat == null ? null :
DataTypeUtils.getDateFormat(timeFormat);
+ final DateFormat tsf = timestampFormat == null ? null :
DataTypeUtils.getDateFormat(timestampFormat);
+
+ LAZY_DATE_FORMAT = () -> df;
+ LAZY_TIME_FORMAT = () -> tf;
+ LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
+ final Reader reader = new InputStreamReader(new
BOMInputStream(in));
+
+ CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
+ .setColumnSeparator(csvFormat.getDelimiter())
+ .setLineSeparator(csvFormat.getRecordSeparator())
+ // Can only use comments in Jackson CSV if the correct
marker is set
+ .setAllowComments("#"
.equals(CharUtils.toString(csvFormat.getCommentMarker())))
+ // The call to setUseHeader(false) in all code paths is
due to the way Jackson does data binding/mapping. Missing or extra columns may
not
+ // be handled correctly when using the header for mapping.
+ .setUseHeader(false);
+
+ csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ?
csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter());
+ csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ?
csvSchemaBuilder :
csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter());
+
+ if (hasHeader) {
+ if (ignoreHeader) {
+ csvSchemaBuilder =
csvSchemaBuilder.setSkipFirstDataRow(true);
+ }
+ }
+
+ CsvSchema csvSchema = csvSchemaBuilder.build();
+
+ // Add remaining config options to the mapper
+ List<CsvParser.Feature> features = new ArrayList<>(3);
+ features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS);
+ if (csvFormat.getIgnoreEmptyLines()) {
+ features.add(CsvParser.Feature.SKIP_EMPTY_LINES);
+ }
+ if (csvFormat.getTrim()) {
+ features.add(CsvParser.Feature.TRIM_SPACES);
+ }
+
+ ObjectReader objReader = mapper.readerFor(String[].class)
+ .with(csvSchema)
+ .withFeatures(features.toArray(new CsvParser.Feature[3]));
+
+ recordStream = objReader.readValues(reader);
+ }
+
+ @Override
+ public Record nextRecord(final boolean coerceTypes, final boolean
dropUnknownFields) throws IOException, MalformedRecordException {
+ final RecordSchema schema = getSchema();
+
+ if (recordStream.hasNext()) {
+ String[] csvRecord = null;
+ try {
+ csvRecord = recordStream.next();
+ } catch (Exception e) {
--- End diff --
This gives me pause. I think if we catch IOException, we should let it fly.
Otherwise, we should wrap the Exception in a MalformedRecordException
> Improve performance of CSVReader
> --------------------------------
>
> Key: NIFI-4496
> URL: https://issues.apache.org/jira/browse/NIFI-4496
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> During some throughput testing, it was noted that the CSVReader was not as
> fast as desired, processing less than 50k records per second. A look at [this
> benchmark|https://github.com/uniVocity/csv-parsers-comparison] implies that
> the Apache Commons CSV parser (used by CSVReader) is quite slow compared to
> others.
> From that benchmark it appears that CSVReader could be enhanced by using a
> different CSV parser under the hood. Perhaps Jackson is the best choice, as
> it is fast when values are quoted, and is a mature and maintained codebase.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)