[ https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960378#comment-15960378 ]
Chesnay Schepler commented on FLINK-6276: ----------------------------------------- I believe the problem is the your generic join method which basically throws away all information about K and V. I'm not sure if we can even fix this, [~twalthr] what do you think? > InvalidTypesException: Unknown Error. Type is null. > --------------------------------------------------- > > Key: FLINK-6276 > URL: https://issues.apache.org/jira/browse/FLINK-6276 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > > Quite frequently when writing Flink code, I get the exception > {{InvalidTypesException: Unknown Error. Type is null.}} > A small example that triggers it is: > {code} > import java.util.ArrayList; > import java.util.Arrays; > import java.util.Iterator; > import java.util.List; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.util.Collector; > public class TestMain { > @SafeVarargs > public static <K, V> DataSet<Tuple2<K, List<V>>> join(V > missingValuePlaceholder, > DataSet<Tuple2<K, V>>... datasets) { > DataSet<Tuple2<K, List<V>>> join = null; > for (int i = 0; i < datasets.length; i++) { > final int datasetIdx = i; > if (datasetIdx == 0) { > join = datasets[datasetIdx] > .map(t -> { > List<V> initialList = new ArrayList<>(); > initialList.add(t.f1); > return new Tuple2<>(t.f0, initialList); > }) > .name("start join"); > } else { > join = join.coGroup(datasets[datasetIdx]) // > .where(0).equalTo(0) // > .with((Iterable<Tuple2<K, List<V>>> li, > Iterable<Tuple2<K, V>> ri, > Collector<Tuple2<K, List<V>>> out) -> { > K key = null; > List<V> vals = new ArrayList<>(datasetIdx + 1); > Iterator<Tuple2<K, List<V>>> lIter = > li.iterator(); > if (!lIter.hasNext()) { > for (int j = 0; j < datasetIdx; j++) { > vals.add(missingValuePlaceholder); > } > } else { > Tuple2<K, List<V>> lt = lIter.next(); > key = lt.f0; > vals.addAll(lt.f1); > if (lIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > Iterator<Tuple2<K, V>> rIter = ri.iterator(); > if (!rIter.hasNext()) { > vals.add(missingValuePlaceholder); > } else { > Tuple2<K, V> rt = rIter.next(); > key = rt.f0; > vals.add(rt.f1); > if (rIter.hasNext()) { > throw new RuntimeException("Got > non-unique key: " + key); > } > } > out.collect(new Tuple2<K, List<V>>(key, vals)); > }) // > .name("join #" + datasetIdx); > } > } > return join; > } > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Tuple2<String, Integer>> x = // > env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), > new Tuple2<>("c", 5)); > DataSet<Tuple2<String, Integer>> y = // > env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), > new Tuple2<>("d", 2)); > DataSet<Tuple2<String, Integer>> z = // > env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), > new Tuple2<>("e", 9)); > System.out.println(join(-1, x, y, z).collect()); > } > } > {code} > The stacktrace that is triggered is: > {noformat} > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The return type > of function 'join(TestMain.java:23)' could not be determined automatically, > due to type erasure. You can give type information hints by using the > returns(...) method on the result of the transformation call, or by letting > your function implement the 'ResultTypeQueryable' interface. > at org.apache.flink.api.java.DataSet.getType(DataSet.java:174) > at > org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424) > at > com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27) > at > com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74) > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input > mismatch: Unknown Error. Type is null. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164) > at org.apache.flink.api.java.DataSet.map(DataSet.java:215) > at > com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23) > ... 1 more > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: > Unknown Error. Type is null. > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234) > at > org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131) > ... 6 more > {noformat} > The code compiles fine, and typechecks. Maybe something is wrong with the > code; but either way, Flink should report a better error message. > A separate issue here is that the error message is being reported for the > wrong function: the problem is not with the return type of > {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda > or something) within the function. (It is the {{where}} clause that throws > the exception.) -- This message was sent by Atlassian JIRA (v6.3.15#6346)