[
https://issues.apache.org/jira/browse/FLINK-1986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14538328#comment-14538328
]
Stephan Ewen commented on FLINK-1986:
-------------------------------------
[~gyfora] and [~senorcarbone] Are you using co-location constraints, to make
sure that head and tail of an iteration are co-located? Otherwise that is not
guaranteed, but required by the backchannel broker.
> Group by fails on iterative data streams
> ----------------------------------------
>
> Key: FLINK-1986
> URL: https://issues.apache.org/jira/browse/FLINK-1986
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Reporter: Daniel Bali
> Labels: iteration, streaming
>
> Hello!
> When I try to run a `groupBy` on an IterativeDataStream I get a
> NullPointerException. Here is the code that reproduces the issue:
> {code}
> public Test() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
> DataStream<Tuple2<Long, Long>> edges = env
> .generateSequence(0, 7)
> .map(new MapFunction<Long, Tuple2<Long, Long>>() {
> @Override
> public Tuple2<Long, Long> map(Long v) throws Exception {
> return new Tuple2<>(v, (v + 1));
> }
> });
> IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
> SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
> .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple)
> throws Exception {
> return tuple;
> }
> })
> .split(new OutputSelector<Tuple2<Long, Long>>() {
> @Override
> public Iterable<String> select(Tuple2<Long, Long> tuple) {
> List<String> output = new ArrayList<>();
> output.add("iterate");
> return output;
> }
> });
> iteration.closeWith(step.select("iterate"));
> env.execute("Sandbox");
> }
> {code}
> Moving the groupBy before the iteration solves the issue. e.g. this works:
> {code}
> ... iteration = edges.groupBy(1).iterate();
> iteration.map(...)
> {code}
> Here is the stack trace:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
> at
> org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
> at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
> at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)