[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377698#comment-14377698
 ] 

ASF GitHub Bot commented on FLINK-1512:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/426#discussion_r27018473
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -52,103 +50,86 @@
        
        public static final String DEFAULT_FIELD_DELIMITER = ",";
     
    -
        private transient Object[] parsedValues;
    -   
    -   private byte[] commentPrefix = null;
    -   
    -   // To speed up readRecord processing. Used to find windows line endings.
    -   // It is set when open so that readRecord does not have to evaluate it
    -   private boolean lineDelimiterIsLinebreak = false;
    -   
    -   private transient int commentCount;
     
    -   private transient int invalidLineCount;
    -   
    -   
    -   public CsvInputFormat(Path filePath) {
    -           super(filePath);
    -   }       
    -   
    -   public CsvInputFormat(Path filePath, Class<?> ... types) {
    -           this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
    -   }       
    +   private Class<OUT> pojoTypeClass = null;
    +   private String[] pojoFieldsName = null;
    +   private transient Field[] pojoFields = null;
    +   private transient PojoTypeInfo<OUT> pojoTypeInfo = null;
    +
    +   public CsvInputFormat(Path filePath, TypeInformation<OUT> 
typeInformation) {
    +           this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
    +   }
        
    -   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class<?>... types) {
    +   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation<OUT> typeInformation) {
                super(filePath);
     
    +           Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
    +           CompositeType<OUT> compositeType = (CompositeType<OUT>) 
typeInformation;
    +
                setDelimiter(lineDelimiter);
                setFieldDelimiter(fieldDelimiter);
     
    -           setFieldTypes(types);
    -   }
    -   
    -   
    -   public byte[] getCommentPrefix() {
    -           return commentPrefix;
    -   }
    -   
    -   public void setCommentPrefix(byte[] commentPrefix) {
    -           this.commentPrefix = commentPrefix;
    -   }
    -   
    -   public void setCommentPrefix(char commentPrefix) {
    -           setCommentPrefix(String.valueOf(commentPrefix));
    -   }
    -   
    -   public void setCommentPrefix(String commentPrefix) {
    -           setCommentPrefix(commentPrefix, Charsets.UTF_8);
    -   }
    -   
    -   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
    -           if (charsetName == null) {
    -                   throw new IllegalArgumentException("Charset name must 
not be null");
    +           Class<?>[] classes = new Class<?>[typeInformation.getArity()];
    +           for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
    +                   classes[i] = compositeType.getTypeAt(i).getTypeClass();
                }
    -           
    -           if (commentPrefix != null) {
    -                   Charset charset = Charset.forName(charsetName);
    -                   setCommentPrefix(commentPrefix, charset);
    -           } else {
    -                   this.commentPrefix = null;
    +           setFieldTypes(classes);
    +
    +           if (typeInformation instanceof PojoTypeInfo) {
    +                   pojoTypeInfo = (PojoTypeInfo<OUT>) typeInformation;
    +                   pojoTypeClass = typeInformation.getTypeClass();
    +                   pojoFieldsName = compositeType.getFieldNames();
    +                   setFieldsOrder(pojoFieldsName);
                }
        }
    -   
    -   public void setCommentPrefix(String commentPrefix, Charset charset) {
    -           if (charset == null) {
    -                   throw new IllegalArgumentException("Charset must not be 
null");
    +
    +   public void setFieldsOrder(String[] fieldsOrder) {
    +           Preconditions.checkNotNull(pojoTypeClass, "Field ordering 
feature can be used only with POJO fields.");
    +           Preconditions.checkNotNull(fieldsOrder);
    +
    +           int includedCount = 0;
    +           for (boolean isIncluded : fieldIncluded) {
    +                   if (isIncluded) {
    +                           includedCount++;
    +                   }
                }
    -           if (commentPrefix != null) {
    -                   this.commentPrefix = commentPrefix.getBytes(charset);
    -           } else {
    -                   this.commentPrefix = null;
    +
    +           Preconditions.checkArgument(includedCount == fieldsOrder.length,
    +                           "The number of selected POJO fields should be 
the same as that of CSV fields.");
    --- End diff --
    
    How about `includedCount + " CSV fields and " + fieldsOrder.length + " POJO 
fields selected. The number of selected CSV and POJO fields must be equal."`


> Add CsvReader for reading into POJOs.
> -------------------------------------
>
>                 Key: FLINK-1512
>                 URL: https://issues.apache.org/jira/browse/FLINK-1512
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Robert Metzger
>            Assignee: Chiwan Park
>            Priority: Minor
>              Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to