The problem is that the the StreamIterationHead is not created, because only IterativeDataStream.transform(...) can create it. groupBy() on an IterativeDataStream does not call transform(), therefore the exception. All methods of DataStream that is supported for iterations and do not call transform() should be overriden in IterativeDataStream in order to add the iteration head.
Peter 2015-05-07 17:11 GMT+02:00 Daniel Bali (JIRA) <j...@apache.org>: > Daniel Bali created FLINK-1986: > ---------------------------------- > > Summary: Group by fails on iterative data streams > Key: FLINK-1986 > URL: https://issues.apache.org/jira/browse/FLINK-1986 > Project: Flink > Issue Type: Bug > Components: Streaming > Reporter: Daniel Bali > > > Hello! > > When I try to run a `groupBy` on an IterativeDataStream I get a > NullPointerException. Here is the code that reproduces the issue: > > {code} > public Test() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > DataStream<Tuple2<Long, Long>> edges = env > .generateSequence(0, 7) > .map(new MapFunction<Long, Tuple2<Long, Long>>() { > @Override > public Tuple2<Long, Long> map(Long v) throws Exception { > return new Tuple2<>(v, (v + 1)); > } > }); > > IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate(); > > SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1) > .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() > { > @Override > public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) > throws Exception { > return tuple; > } > }) > .split(new OutputSelector<Tuple2<Long, Long>>() { > @Override > public Iterable<String> select(Tuple2<Long, Long> tuple) { > List<String> output = new ArrayList<>(); > output.add("iterate"); > return output; > } > }); > > iteration.closeWith(step.select("iterate")); > > env.execute("Sandbox"); > } > {code} > > Moving the groupBy before the iteration solves the issue. e.g. this works: > > {code} > ... iteration = edges.groupBy(1).iterate(); > iteration.map(...) > {code} > > Here is the stack trace: > > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207) > at > org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72) > at > org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73) > at org.apache.flink.graph.streaming.example.Test.main(Test.java:79) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > {code} > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >