[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169089#comment-16169089
 ] 

ASF GitHub Bot commented on FLINK-7050:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4660#discussion_r139294269
  
    --- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.batch.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.common.io.FileInputFormat;
    +import org.apache.flink.api.common.io.ParseException;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.types.parser.FieldParser;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.StringUtils;
    +
    +import com.fasterxml.jackson.databind.MappingIterator;
    +import com.fasterxml.jackson.dataformat.csv.CsvMapper;
    +import com.fasterxml.jackson.dataformat.csv.CsvParser;
    +import com.fasterxml.jackson.dataformat.csv.CsvSchema;
    +
    +import org.apache.commons.io.input.BoundedInputStream;
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * InputFormat that reads csv files and compliant with RFC 4180 standards.
    + *
    + * @param <OUT>
    + */
    +@Internal
    +public abstract class RFCCsvInputFormat<OUT> extends FileInputFormat<OUT> {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +   public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +   private String recordDelimiter = "\n";
    +   private char fieldDelimiter = ',';
    +
    +   private boolean skipFirstLineAsHeader;
    +   private boolean skipFirstLine = false; // only for first split
    +
    +   private boolean quotedStringParsing = false;
    +   private char quoteCharacter;
    +
    +   private boolean lenient;
    +
    +   private String commentPrefix = null;
    +   private boolean allowComments = false;
    +
    +   private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
    +
    +   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
    +
    +   private Class<?>[] fieldTypes = EMPTY_TYPES;
    +
    +   private boolean[] fieldIncluded = EMPTY_INCLUDED;
    +
    +   MappingIterator<Object[]> recordIterator = null;
    +
    +   private boolean endOfSplit = false;
    +
    +   protected RFCCsvInputFormat(Path filePath) {
    +           super(filePath);
    +   }
    +
    +   @Override
    +   public void open(FileInputSplit split) throws IOException {
    +
    +           super.open(split);
    +
    +           CsvMapper mapper = new CsvMapper();
    +           mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
    +
    +           long firstDelimiterPosition = findFirstDelimiterPosition();
    +           long lastDelimiterPosition = findLastDelimiterPosition();
    +           long startPos = this.splitStart + firstDelimiterPosition;
    +           long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
    +           this.stream.seek(startPos);
    +           BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
    +
    +           if (skipFirstLineAsHeader && startPos == 0) {
    +                   skipFirstLine = true;
    +           }
    +
    +           CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
    +           CsvSchema csvSchema = configureParserSettings();
    +           csvParser.setSchema(csvSchema);
    +
    +           recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
    +   }
    +
    +   private CsvSchema configureParserSettings() {
    +
    +           CsvSchema csvSchema = CsvSchema.builder()
    +                   .setLineSeparator(this.recordDelimiter)
    +                   .setColumnSeparator(this.fieldDelimiter)
    +                   .setSkipFirstDataRow(skipFirstLine)
    +                   .setQuoteChar(this.quoteCharacter)
    +                   .setAllowComments(allowComments)
    +                   .build();
    +           return  csvSchema;
    +   }
    +
    +   public OUT nextRecord(OUT reuse) throws IOException {
    +
    +           if (recordIterator == null) {
    +                   return null;
    +           }
    +
    +           if (recordIterator.hasNext()) {
    +
    +                   Object[] record = recordIterator.next();
    +
    +                   if (record.length < fieldTypes.length) {
    +                           if (isLenient()) {
    +                                   return nextRecord(reuse);
    +                           }
    +                           else {
    +                                   throw new ParseException();
    +                           }
    +                   }
    +
    +                   try {
    +                           return fillRecord(reuse, 
castRecord(projectedFields(record)));
    +                   }
    +                   catch (IOException e) {
    +                           if (isLenient()) {
    +                                   return nextRecord(reuse);
    +                           }
    +                           else {
    +                                   throw new ParseException(e);
    +                           }
    +                   }
    +           }
    +           endOfSplit = true;
    +           return null;
    +   }
    +
    +   protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues);
    +
    +   @Override
    +   public String toString() {
    +           return "CSV Input (" + 
StringUtils.showControlCharacters(String.valueOf(fieldDelimiter)) + ") " + 
getFilePath();
    +   }
    +
    +   @Override
    +   public boolean reachedEnd() throws IOException {
    +           return endOfSplit;
    +   }
    +
    +   private Object[] projectedFields(Object[] record) throws IOException {
    +
    +           Object[] resultantRecord = new Object[fieldTypes.length];
    +           int index = 0;
    +           for (int i = 0; i < this.fieldIncluded.length; i++) {
    +
    +                   try {
    +                           if (fieldIncluded[i]) {
    +                                   resultantRecord[index++] = record[i];
    +                           }
    +                   }
    +                   catch (Exception e) {
    +                           throw new IOException();
    +                   }
    +           }
    +           return resultantRecord;
    +   }
    +
    +   private long findFirstDelimiterPosition() throws IOException{
    +
    +           this.stream.seek(this.getSplitStart());
    +           if (this.stream.getPos() == 0) {
    +                   return 0;
    +           }
    +           else {
    +                   int pos = 1;
    +                   while ((this.stream.read()) != 
this.recordDelimiter.charAt(0)) {
    --- End diff --
    
    You only check the first character of the delimiter. We do not enforce a 
single character delimiter in the setter and the CSV parser seems to support 
multiple characters as well. Hence, we must check for the full delimiter. It 
would also make sense to convert the delimiter into a `char[]` for more 
efficient access of the characters.


> RFC Compliant CSV Parser for Table Source
> -----------------------------------------
>
>                 Key: FLINK-7050
>                 URL: https://issues.apache.org/jira/browse/FLINK-7050
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.3.1
>            Reporter: Usman Younas
>            Assignee: Usman Younas
>              Labels: csv, parsing
>             Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to