Repository: flink
Updated Branches:
  refs/heads/master 5ac96c7c2 -> 1c5a929c7


[FLINK-6909] [types] Fix error message in CsvReader for wrong type class

This closes #6037.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c5a929c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c5a929c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c5a929c

Branch: refs/heads/master
Commit: 1c5a929c7ad6e40939e1170cdaf19c6ffd64f583
Parents: 5ac96c7
Author: Timo Walther <[email protected]>
Authored: Thu May 17 17:35:55 2018 +0200
Committer: Timo Walther <[email protected]>
Committed: Tue May 22 14:45:36 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/CsvReader.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c5a929c/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index df90d54..6c4d4b7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
@@ -316,14 +317,18 @@ public class CsvReader {
                Preconditions.checkNotNull(pojoType, "The POJO type class must 
not be null.");
                Preconditions.checkNotNull(pojoFields, "POJO fields must be 
specified (not null) if output type is a POJO.");
 
-               @SuppressWarnings("unchecked")
-               PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) 
TypeExtractor.createTypeInfo(pojoType);
+               final TypeInformation<T> ti = 
TypeExtractor.createTypeInfo(pojoType);
+               if (!(ti instanceof PojoTypeInfo)) {
+                       throw new IllegalArgumentException(
+                               "The specified class is not a POJO. The type 
class must meet the POJO requirements. Found: " + ti);
+               }
+               final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
 
-               CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, 
this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, 
this.includedMask);
+               CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, 
this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
 
                configureInputFormat(inputFormat);
 
-               return new DataSource<T>(executionContext, inputFormat, 
typeInfo, Utils.getCallLocationName());
+               return new DataSource<T>(executionContext, inputFormat, pti, 
Utils.getCallLocationName());
        }
 
        /**

Reply via email to