[ 
https://issues.apache.org/jira/browse/FLINK-1986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14539589#comment-14539589
 ] 

Péter Szabó commented on FLINK-1986:
------------------------------------

The problem is that the the StreamIterationHead is not created, because only 
IterativeDataStream.transform(...) can create it. groupBy() on an 
IterativeDataStream does not call transform(), therefore the exception. All 
methods of DataStream that is supported for iterations and do not call 
transform() should be overriden in IterativeDataStream in order to add the 
iteration head.

> Group by fails on iterative data streams
> ----------------------------------------
>
>                 Key: FLINK-1986
>                 URL: https://issues.apache.org/jira/browse/FLINK-1986
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Daniel Bali
>              Labels: iteration, streaming
>
> Hello!
> When I try to run a `groupBy` on an IterativeDataStream I get a 
> NullPointerException. Here is the code that reproduces the issue:
> {code}
> public Test() throws Exception {
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>     DataStream<Tuple2<Long, Long>> edges = env
>             .generateSequence(0, 7)
>             .map(new MapFunction<Long, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Long v) throws Exception {
>                     return new Tuple2<>(v, (v + 1));
>                 }
>             });
>     IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
>     SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
>             .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) 
> throws Exception {
>                     return tuple;
>                 }
>             })
>             .split(new OutputSelector<Tuple2<Long, Long>>() {
>                 @Override
>                 public Iterable<String> select(Tuple2<Long, Long> tuple) {
>                     List<String> output = new ArrayList<>();
>                     output.add("iterate");
>                     return output;
>                 }
>             });
>     iteration.closeWith(step.select("iterate"));
>     env.execute("Sandbox");
> }
> {code}
> Moving the groupBy before the iteration solves the issue. e.g. this works:
> {code}
> ... iteration = edges.groupBy(1).iterate();
> iteration.map(...)
> {code}
> Here is the stack trace:
> {code}
> Exception in thread "main" java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
>       at 
> org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
>       at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
>       at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:601)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to