[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998999#comment-14998999
 ] 

ASF GitHub Bot commented on FLINK-2692:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1266#discussion_r44439588
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -18,32 +18,97 @@
     
     package org.apache.flink.api.java.io;
     
    +import com.google.common.base.Preconditions;
    +import com.google.common.primitives.Ints;
    +import org.apache.flink.api.common.io.GenericCsvInputFormat;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.types.parser.FieldParser;
     
    -import org.apache.flink.api.common.typeutils.CompositeType;
    -import org.apache.flink.api.java.tuple.Tuple;
    +import java.io.IOException;
     import org.apache.flink.core.fs.Path;
     import org.apache.flink.util.StringUtils;
     
    -public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
    +public abstract class CsvInputFormat<OUT> extends 
GenericCsvInputFormat<OUT> {
     
        private static final long serialVersionUID = 1L;
    +
    +   public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +   public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +   protected transient Object[] parsedValues;
        
    -   public CsvInputFormat(Path filePath, CompositeType<OUT> 
typeInformation) {
    -           super(filePath, typeInformation);
    +   protected CsvInputFormat(Path filePath) {
    +           super(filePath);
        }
    -   
    -   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType<OUT> typeInformation) {
    -           super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
    +
    +   @Override
    +   public void open(FileInputSplit split) throws IOException {
    +           super.open(split);
    +
    +           @SuppressWarnings("unchecked")
    +           FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) 
getFieldParsers();
    +
    +           //throw exception if no field parsers are available
    +           if (fieldParsers.length == 0) {
    +                   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
    +           }
    +
    +           // create the value holders
    +           this.parsedValues = new Object[fieldParsers.length];
    +           for (int i = 0; i < fieldParsers.length; i++) {
    +                   this.parsedValues[i] = fieldParsers[i].createValue();
    +           }
    +
    +           // left to right evaluation makes access [0] okay
    +           // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
    +           if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
    +                   this.lineDelimiterIsLinebreak = true;
    +           }
    +
    +           this.commentCount = 0;
    +           this.invalidLineCount = 0;
        }
     
        @Override
    -   protected OUT createTuple(OUT reuse) {
    -           Tuple result = (Tuple) reuse;
    -           for (int i = 0; i < parsedValues.length; i++) {
    -                   result.setField(parsedValues[i], i);
    +   public OUT nextRecord(OUT record) throws IOException {
    +           OUT returnRecord = null;
    +           do {
    +                   returnRecord = super.nextRecord(record);
    +           } while (returnRecord == null && !reachedEnd());
    +
    +           return returnRecord;
    +   }
    +
    +   public Class<?>[] getFieldTypes() {
    +           return super.getGenericFieldTypes();
    +   }
    +
    +   protected static boolean[] createDefaultMask(int size) {
    +           boolean[] includedMask = new boolean[size];
    +           for (int x=0; x<includedMask.length; x++) {
    +                   includedMask[x] = true;
    +           }
    +           return includedMask;
    +   }
    +
    +   protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    --- End diff --
    
    I see your point, but don't think it's due to this methods. It follows a 
similar implementation in GenericCsvInputFormat.setFieldsGeneric that was used 
until now.
    
    The key thing is that previously we checked the indices for a monotonous 
order, so the case you described couldn't occur. That check wasn't technically 
necessary, hence i removed it.
    
    We can either re-add that check, or add documentation to the CsvInputFormat 
constructor and Scala ExecutionEnvironment.readCsvFile method.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-2692
>                 URL: https://issues.apache.org/jira/browse/FLINK-2692
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Chesnay Schepler
>            Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to