[ https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356725#comment-14356725 ]
ASF GitHub Bot commented on FLINK-1512: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26203612 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -64,26 +70,45 @@ private transient int commentCount; private transient int invalidLineCount; + + private CompositeType<OUT> typeInformation = null; + + private String[] fieldsMap = null; + public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class<?> ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + this.typeInformation = (CompositeType<OUT>) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); + Class<?>[] classes = new Class<?>[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) { + classes[i] = this.typeInformation.getTypeAt(i).getTypeClass(); + } + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + setFieldsMap(this.typeInformation.getFieldNames()); + setAccessibleToField(); + } } - - + + public void setAccessibleToField() { --- End diff -- Can we make this method private? > Add CsvReader for reading into POJOs. > ------------------------------------- > > Key: FLINK-1512 > URL: https://issues.apache.org/jira/browse/FLINK-1512 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Robert Metzger > Assignee: Chiwan Park > Priority: Minor > Labels: starter > > Currently, the {{CsvReader}} supports only TupleXX types. > It would be nice if users were also able to read into POJOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)