[ https://issues.apache.org/jira/browse/NIFI-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298633#comment-16298633 ]
ASF GitHub Bot commented on NIFI-4496: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2245#discussion_r158051319 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java --- @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.io.input.BOMInputStream; +import org.apache.commons.lang3.CharUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + + +public class JacksonCSVRecordReader implements RecordReader { + private final RecordSchema schema; + + private final Supplier<DateFormat> LAZY_DATE_FORMAT; + private final Supplier<DateFormat> LAZY_TIME_FORMAT; + private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT; + + private final ComponentLog logger; + private final boolean hasHeader; + private final boolean ignoreHeader; + private final MappingIterator<String[]> recordStream; + private List<String> rawFieldNames = null; + + private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY); + + public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, + final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { + + this.schema = schema; + this.logger = logger; + this.hasHeader = hasHeader; + this.ignoreHeader = ignoreHeader; + final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); + + LAZY_DATE_FORMAT = () -> df; + LAZY_TIME_FORMAT = () -> tf; + LAZY_TIMESTAMP_FORMAT = () -> tsf; + + final Reader reader = new InputStreamReader(new BOMInputStream(in)); + + CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder() + .setColumnSeparator(csvFormat.getDelimiter()) + .setLineSeparator(csvFormat.getRecordSeparator()) + // Can only use comments in Jackson CSV if the correct marker is set + .setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker()))) + // The call to setUseHeader(false) in all code paths is due to the way Jackson does data binding/mapping. Missing or extra columns may not + // be handled correctly when using the header for mapping. + .setUseHeader(false); + + csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter()); + csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter()); + + if (hasHeader) { + if (ignoreHeader) { + csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true); + } + } + + CsvSchema csvSchema = csvSchemaBuilder.build(); + + // Add remaining config options to the mapper + List<CsvParser.Feature> features = new ArrayList<>(3); + features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS); + if (csvFormat.getIgnoreEmptyLines()) { + features.add(CsvParser.Feature.SKIP_EMPTY_LINES); + } + if (csvFormat.getTrim()) { + features.add(CsvParser.Feature.TRIM_SPACES); + } + + ObjectReader objReader = mapper.readerFor(String[].class) + .with(csvSchema) + .withFeatures(features.toArray(new CsvParser.Feature[3])); + + recordStream = objReader.readValues(reader); + } + + @Override + public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { + final RecordSchema schema = getSchema(); + + if (recordStream.hasNext()) { + String[] csvRecord = null; + try { + csvRecord = recordStream.next(); + } catch (Exception e) { --- End diff -- This gives me pause. I think if we catch IOException, we should let it fly. Otherwise, we should wrap the Exception in a MalformedRecordException > Improve performance of CSVReader > -------------------------------- > > Key: NIFI-4496 > URL: https://issues.apache.org/jira/browse/NIFI-4496 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > During some throughput testing, it was noted that the CSVReader was not as > fast as desired, processing less than 50k records per second. A look at [this > benchmark|https://github.com/uniVocity/csv-parsers-comparison] implies that > the Apache Commons CSV parser (used by CSVReader) is quite slow compared to > others. > From that benchmark it appears that CSVReader could be enhanced by using a > different CSV parser under the hood. Perhaps Jackson is the best choice, as > it is fast when values are quoted, and is a mature and maintained codebase. -- This message was sent by Atlassian JIRA (v6.4.14#64029)