[ 
https://issues.apache.org/jira/browse/NIFI-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16298633#comment-16298633
 ] 

ASF GitHub Bot commented on NIFI-4496:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2245#discussion_r158051319
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
 ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.csv;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.databind.ObjectReader;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +import org.apache.commons.csv.CSVFormat;
    +import org.apache.commons.io.input.BOMInputStream;
    +import org.apache.commons.lang3.CharUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.Reader;
    +import java.text.DateFormat;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.function.Supplier;
    +
    +
    +public class JacksonCSVRecordReader implements RecordReader {
    +    private final RecordSchema schema;
    +
    +    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
    +    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
    +
    +    private final ComponentLog logger;
    +    private final boolean hasHeader;
    +    private final boolean ignoreHeader;
    +    private final MappingIterator<String[]> recordStream;
    +    private List<String> rawFieldNames = null;
    +
    +    private volatile static CsvMapper mapper = new 
CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +    public JacksonCSVRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean 
hasHeader, final boolean ignoreHeader,
    +                                  final String dateFormat, final String 
timeFormat, final String timestampFormat, final String encoding) throws 
IOException {
    +
    +        this.schema = schema;
    +        this.logger = logger;
    +        this.hasHeader = hasHeader;
    +        this.ignoreHeader = ignoreHeader;
    +        final DateFormat df = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
    +        final DateFormat tf = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
    +        final DateFormat tsf = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
    +
    +        LAZY_DATE_FORMAT = () -> df;
    +        LAZY_TIME_FORMAT = () -> tf;
    +        LAZY_TIMESTAMP_FORMAT = () -> tsf;
    +
    +        final Reader reader = new InputStreamReader(new 
BOMInputStream(in));
    +
    +        CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
    +                .setColumnSeparator(csvFormat.getDelimiter())
    +                .setLineSeparator(csvFormat.getRecordSeparator())
    +                // Can only use comments in Jackson CSV if the correct 
marker is set
    +                .setAllowComments("#" 
.equals(CharUtils.toString(csvFormat.getCommentMarker())))
    +                // The call to setUseHeader(false) in all code paths is 
due to the way Jackson does data binding/mapping. Missing or extra columns may 
not
    +                // be handled correctly when using the header for mapping.
    +                .setUseHeader(false);
    +
    +        csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? 
csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter());
    +        csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? 
csvSchemaBuilder : 
csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter());
    +
    +        if (hasHeader) {
    +            if (ignoreHeader) {
    +                csvSchemaBuilder = 
csvSchemaBuilder.setSkipFirstDataRow(true);
    +            }
    +        }
    +
    +        CsvSchema csvSchema = csvSchemaBuilder.build();
    +
    +        // Add remaining config options to the mapper
    +        List<CsvParser.Feature> features = new ArrayList<>(3);
    +        features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS);
    +        if (csvFormat.getIgnoreEmptyLines()) {
    +            features.add(CsvParser.Feature.SKIP_EMPTY_LINES);
    +        }
    +        if (csvFormat.getTrim()) {
    +            features.add(CsvParser.Feature.TRIM_SPACES);
    +        }
    +
    +        ObjectReader objReader = mapper.readerFor(String[].class)
    +                .with(csvSchema)
    +                .withFeatures(features.toArray(new CsvParser.Feature[3]));
    +
    +        recordStream = objReader.readValues(reader);
    +    }
    +
    +    @Override
    +    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
    +        final RecordSchema schema = getSchema();
    +
    +        if (recordStream.hasNext()) {
    +            String[] csvRecord = null;
    +            try {
    +                csvRecord = recordStream.next();
    +            } catch (Exception e) {
    --- End diff --
    
    This gives me pause. I think if we catch IOException, we should let it fly. 
Otherwise, we should wrap the Exception in a MalformedRecordException


> Improve performance of CSVReader
> --------------------------------
>
>                 Key: NIFI-4496
>                 URL: https://issues.apache.org/jira/browse/NIFI-4496
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> During some throughput testing, it was noted that the CSVReader was not as 
> fast as desired, processing less than 50k records per second. A look at [this 
> benchmark|https://github.com/uniVocity/csv-parsers-comparison] implies that 
> the Apache Commons CSV parser (used by CSVReader) is quite slow compared to 
> others.
> From that benchmark it appears that CSVReader could be enhanced by using a 
> different CSV parser under the hood. Perhaps Jackson is the best choice, as 
> it is fast when values are quoted, and is a mature and maintained codebase.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to