Repository: flink
Updated Branches:
  refs/heads/release-1.5 5e71bf872 -> 30c224abd


[FLINK-6909] [types] Fix error message in CsvReader for wrong type class

This closes #6037.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c224ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c224ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c224ab

Branch: refs/heads/release-1.5
Commit: 30c224abd0c4344006fd4462feb7fab6b8bc66d3
Parents: 5e71bf8
Author: Timo Walther <[email protected]>
Authored: Thu May 17 17:35:55 2018 +0200
Committer: Timo Walther <[email protected]>
Committed: Tue May 22 14:48:21 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/CsvReader.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30c224ab/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index df90d54..6c4d4b7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
@@ -316,14 +317,18 @@ public class CsvReader {
                Preconditions.checkNotNull(pojoType, "The POJO type class must 
not be null.");
                Preconditions.checkNotNull(pojoFields, "POJO fields must be 
specified (not null) if output type is a POJO.");
 
-               @SuppressWarnings("unchecked")
-               PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) 
TypeExtractor.createTypeInfo(pojoType);
+               final TypeInformation<T> ti = 
TypeExtractor.createTypeInfo(pojoType);
+               if (!(ti instanceof PojoTypeInfo)) {
+                       throw new IllegalArgumentException(
+                               "The specified class is not a POJO. The type 
class must meet the POJO requirements. Found: " + ti);
+               }
+               final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
 
-               CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, 
this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, 
this.includedMask);
+               CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, 
this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
 
                configureInputFormat(inputFormat);
 
-               return new DataSource<T>(executionContext, inputFormat, 
typeInfo, Utils.getCallLocationName());
+               return new DataSource<T>(executionContext, inputFormat, pti, 
Utils.getCallLocationName());
        }
 
        /**

Reply via email to