Repository: flink Updated Branches: refs/heads/release-1.4 7cd2b392a -> 73ca6cc0f
[FLINK-6909] [types] Fix error message in CsvReader for wrong type class This closes #6037. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73ca6cc0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73ca6cc0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73ca6cc0 Branch: refs/heads/release-1.4 Commit: 73ca6cc0fac02b3048c798b1640484fb96985148 Parents: 7cd2b39 Author: Timo Walther <[email protected]> Authored: Thu May 17 17:35:55 2018 +0200 Committer: Timo Walther <[email protected]> Committed: Tue May 22 14:49:49 2018 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/io/CsvReader.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/73ca6cc0/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index df90d54..6c4d4b7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.io; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; @@ -316,14 +317,18 @@ public class CsvReader { Preconditions.checkNotNull(pojoType, "The POJO type class must not be null."); Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO."); - @SuppressWarnings("unchecked") - PojoTypeInfo<T> typeInfo = (PojoTypeInfo<T>) TypeExtractor.createTypeInfo(pojoType); + final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType); + if (!(ti instanceof PojoTypeInfo)) { + throw new IllegalArgumentException( + "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti); + } + final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti; - CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask); + CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask); configureInputFormat(inputFormat); - return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); + return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName()); } /**
