[
https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-6276:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> InvalidTypesException: Unknown Error. Type is null.
> ---------------------------------------------------
>
> Key: FLINK-6276
> URL: https://issues.apache.org/jira/browse/FLINK-6276
> Project: Flink
> Issue Type: Bug
> Components: API / DataSet
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Quite frequently when writing Flink code, I get the exception
> {{InvalidTypesException: Unknown Error. Type is null.}}
> A small example that triggers it is:
> {code}
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> public class TestMain {
> @SafeVarargs
> public static <K, V> DataSet<Tuple2<K, List<V>>> join(V
> missingValuePlaceholder,
> DataSet<Tuple2<K, V>>... datasets) {
> DataSet<Tuple2<K, List<V>>> join = null;
> for (int i = 0; i < datasets.length; i++) {
> final int datasetIdx = i;
> if (datasetIdx == 0) {
> join = datasets[datasetIdx]
> .map(t -> {
> List<V> initialList = new ArrayList<>();
> initialList.add(t.f1);
> return new Tuple2<>(t.f0, initialList);
> })
> .name("start join");
> } else {
> join = join.coGroup(datasets[datasetIdx]) //
> .where(0).equalTo(0) //
> .with((Iterable<Tuple2<K, List<V>>> li,
> Iterable<Tuple2<K, V>> ri,
> Collector<Tuple2<K, List<V>>> out) -> {
> K key = null;
> List<V> vals = new ArrayList<>(datasetIdx + 1);
> Iterator<Tuple2<K, List<V>>> lIter =
> li.iterator();
> if (!lIter.hasNext()) {
> for (int j = 0; j < datasetIdx; j++) {
> vals.add(missingValuePlaceholder);
> }
> } else {
> Tuple2<K, List<V>> lt = lIter.next();
> key = lt.f0;
> vals.addAll(lt.f1);
> if (lIter.hasNext()) {
> throw new RuntimeException("Got
> non-unique key: " + key);
> }
> }
> Iterator<Tuple2<K, V>> rIter = ri.iterator();
> if (!rIter.hasNext()) {
> vals.add(missingValuePlaceholder);
> } else {
> Tuple2<K, V> rt = rIter.next();
> key = rt.f0;
> vals.add(rt.f1);
> if (rIter.hasNext()) {
> throw new RuntimeException("Got
> non-unique key: " + key);
> }
> }
> out.collect(new Tuple2<K, List<V>>(key, vals));
> }) //
> .name("join #" + datasetIdx);
> }
> }
> return join;
> }
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<Tuple2<String, Integer>> x = //
> env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4),
> new Tuple2<>("c", 5));
> DataSet<Tuple2<String, Integer>> y = //
> env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1),
> new Tuple2<>("d", 2));
> DataSet<Tuple2<String, Integer>> z = //
> env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8),
> new Tuple2<>("e", 9));
> System.out.println(join(-1, x, y, z).collect());
> }
> }
> {code}
> The stacktrace that is triggered is:
> {noformat}
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The return type
> of function 'join(TestMain.java:23)' could not be determined automatically,
> due to type erasure. You can give type information hints by using the
> returns(...) method on the result of the transformation call, or by letting
> your function implement the 'ResultTypeQueryable' interface.
> at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
> at
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
> at
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
> at
> com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input
> mismatch: Unknown Error. Type is null.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
> at
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
> ... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Unknown Error. Type is null.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
> ... 6 more
> {noformat}
> The code compiles fine, and typechecks. Maybe something is wrong with the
> code; but either way, Flink should report a better error message.
> A separate issue here is that the error message is being reported for the
> wrong function: the problem is not with the return type of
> {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda
> or something) within the function. (It is the {{where}} clause that throws
> the exception.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)