[ 
https://issues.apache.org/jira/browse/FLINK-6276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960378#comment-15960378
 ] 

Chesnay Schepler commented on FLINK-6276:
-----------------------------------------

I believe the problem is the your generic join method which basically throws 
away all information about K and V. I'm not sure if we can even fix this, 
[~twalthr] what do you think?

> InvalidTypesException: Unknown Error. Type is null.
> ---------------------------------------------------
>
>                 Key: FLINK-6276
>                 URL: https://issues.apache.org/jira/browse/FLINK-6276
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataSet API
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>
> Quite frequently when writing Flink code, I get the exception 
> {{InvalidTypesException: Unknown Error. Type is null.}} 
> A small example that triggers it is:
> {code}
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> public class TestMain {
>     @SafeVarargs
>     public static <K, V> DataSet<Tuple2<K, List<V>>> join(V 
> missingValuePlaceholder,
>             DataSet<Tuple2<K, V>>... datasets) {
>         DataSet<Tuple2<K, List<V>>> join = null;
>         for (int i = 0; i < datasets.length; i++) {
>             final int datasetIdx = i;
>             if (datasetIdx == 0) {
>                 join = datasets[datasetIdx]
>                        .map(t -> {
>                             List<V> initialList = new ArrayList<>();
>                             initialList.add(t.f1);
>                             return new Tuple2<>(t.f0, initialList);
>                         })
>                         .name("start join");
>             } else {
>                 join = join.coGroup(datasets[datasetIdx]) //
>                         .where(0).equalTo(0) //
>                         .with((Iterable<Tuple2<K, List<V>>> li, 
> Iterable<Tuple2<K, V>> ri,
>                                 Collector<Tuple2<K, List<V>>> out) -> {
>                             K key = null;
>                             List<V> vals = new ArrayList<>(datasetIdx + 1);
>                             Iterator<Tuple2<K, List<V>>> lIter = 
> li.iterator();
>                             if (!lIter.hasNext()) {
>                                 for (int j = 0; j < datasetIdx; j++) {
>                                     vals.add(missingValuePlaceholder);
>                                 }
>                             } else {
>                                 Tuple2<K, List<V>> lt = lIter.next();
>                                 key = lt.f0;
>                                 vals.addAll(lt.f1);
>                                 if (lIter.hasNext()) {
>                                     throw new RuntimeException("Got 
> non-unique key: " + key);
>                                 }
>                             }
>                             Iterator<Tuple2<K, V>> rIter = ri.iterator();
>                             if (!rIter.hasNext()) {
>                                 vals.add(missingValuePlaceholder);
>                             } else {
>                                 Tuple2<K, V> rt = rIter.next();
>                                 key = rt.f0;
>                                 vals.add(rt.f1);
>                                 if (rIter.hasNext()) {
>                                     throw new RuntimeException("Got 
> non-unique key: " + key);
>                                 }
>                             }
>                             out.collect(new Tuple2<K, List<V>>(key, vals));
>                         }) //
>                         .name("join #" + datasetIdx);
>             }
>         }
>         return join;
>     }
>     public static void main(String[] args) throws Exception {
>         ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Tuple2<String, Integer>> x = //
>                 env.fromElements(new Tuple2<>("a", 3), new Tuple2<>("b", 4), 
> new Tuple2<>("c", 5));
>         DataSet<Tuple2<String, Integer>> y = //
>                 env.fromElements(new Tuple2<>("b", 0), new Tuple2<>("c", 1), 
> new Tuple2<>("d", 2));
>         DataSet<Tuple2<String, Integer>> z = //
>                 env.fromElements(new Tuple2<>("c", 7), new Tuple2<>("d", 8), 
> new Tuple2<>("e", 9));
>         System.out.println(join(-1, x, y, z).collect());
>     }
> }
> {code}
> The stacktrace that is triggered is:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The return type 
> of function 'join(TestMain.java:23)' could not be determined automatically, 
> due to type erasure. You can give type information hints by using the 
> returns(...) method on the result of the transformation call, or by letting 
> your function implement the 'ResultTypeQueryable' interface.
>       at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
>       at 
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets.where(CoGroupOperator.java:424)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:27)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.main(TestMain.java:74)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input 
> mismatch: Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
>       at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
>       at 
> com.rentlogic.buildingscores.flink.experimental.TestMain.join(TestMain.java:23)
>       ... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
> Unknown Error. Type is null.
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
>       at 
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
>       ... 6 more
> {noformat}
> The code compiles fine, and typechecks. Maybe something is wrong with the 
> code; but either way, Flink should report a better error message.
> A separate issue here is that the error message is being reported for the 
> wrong function: the problem is not with the return type of 
> {{join(TestMain.java:23)}}, it is some internal type (probably for a lambda 
> or something) within the function. (It is the {{where}} clause that throws 
> the exception.)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to