[ https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377698#comment-14377698 ]
ASF GitHub Bot commented on FLINK-1512: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018473 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -52,103 +50,86 @@ public static final String DEFAULT_FIELD_DELIMITER = ","; - private transient Object[] parsedValues; - - private byte[] commentPrefix = null; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - private transient int commentCount; - private transient int invalidLineCount; - - - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class<?> ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } + private Class<OUT> pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfo<OUT> pojoTypeInfo = null; + + public CsvInputFormat(Path filePath, TypeInformation<OUT> typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class<?>... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformation<OUT> typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + CompositeType<OUT> compositeType = (CompositeType<OUT>) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); - } - - - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); - } - - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException("Charset name must not be null"); + Class<?>[] classes = new Class<?>[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i < arity; i++) { + classes[i] = compositeType.getTypeAt(i).getTypeClass(); } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation; + pojoTypeClass = typeInformation.getTypeClass(); + pojoFieldsName = compositeType.getFieldNames(); + setFieldsOrder(pojoFieldsName); } } - - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException("Charset must not be null"); + + public void setFieldsOrder(String[] fieldsOrder) { + Preconditions.checkNotNull(pojoTypeClass, "Field ordering feature can be used only with POJO fields."); + Preconditions.checkNotNull(fieldsOrder); + + int includedCount = 0; + for (boolean isIncluded : fieldIncluded) { + if (isIncluded) { + includedCount++; + } } - if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); - } else { - this.commentPrefix = null; + + Preconditions.checkArgument(includedCount == fieldsOrder.length, + "The number of selected POJO fields should be the same as that of CSV fields."); --- End diff -- How about `includedCount + " CSV fields and " + fieldsOrder.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal."` > Add CsvReader for reading into POJOs. > ------------------------------------- > > Key: FLINK-1512 > URL: https://issues.apache.org/jira/browse/FLINK-1512 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Robert Metzger > Assignee: Chiwan Park > Priority: Minor > Labels: starter > > Currently, the {{CsvReader}} supports only TupleXX types. > It would be nice if users were also able to read into POJOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)