[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356725#comment-14356725
 ] 

ASF GitHub Bot commented on FLINK-1512:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/426#discussion_r26203612
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -64,26 +70,45 @@
        private transient int commentCount;
     
        private transient int invalidLineCount;
    +
    +   private CompositeType<OUT> typeInformation = null;
    +
    +   private String[] fieldsMap = null;
        
    +   public CsvInputFormat(Path filePath, TypeInformation<OUT> 
typeInformation) {
    +           this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
    +   }
        
    -   public CsvInputFormat(Path filePath) {
    -           super(filePath);
    -   }       
    -   
    -   public CsvInputFormat(Path filePath, Class<?> ... types) {
    -           this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
    -   }       
    -   
    -   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class<?>... types) {
    +   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation<OUT> typeInformation) {
                super(filePath);
     
    +           Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
    +           this.typeInformation = (CompositeType<OUT>) typeInformation;
    +
                setDelimiter(lineDelimiter);
                setFieldDelimiter(fieldDelimiter);
     
    -           setFieldTypes(types);
    +           Class<?>[] classes = new Class<?>[typeInformation.getArity()];
    +           for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
    +                   classes[i] = 
this.typeInformation.getTypeAt(i).getTypeClass();
    +           }
    +           setFieldTypes(classes);
    +
    +           if (typeInformation instanceof PojoTypeInfo) {
    +                   setFieldsMap(this.typeInformation.getFieldNames());
    +                   setAccessibleToField();
    +           }
        }
    -   
    -   
    +
    +   public void setAccessibleToField() {
    --- End diff --
    
    Can we make this method private?


> Add CsvReader for reading into POJOs.
> -------------------------------------
>
>                 Key: FLINK-1512
>                 URL: https://issues.apache.org/jira/browse/FLINK-1512
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Robert Metzger
>            Assignee: Chiwan Park
>            Priority: Minor
>              Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to