[ 
https://issues.apache.org/jira/browse/FLINK-2470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14651780#comment-14651780
 ] 

Aljoscha Krettek commented on FLINK-2470:
-----------------------------------------

This is the code I was using. Notice that the loop hangs sooner if you feed in 
elements on the 9999 socket stream (which is broadcast). Also, when activating 
the other feedback stream and deactivating the broadcast feedback stream I 
didn't manage to make it hang.

{code}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

public class IterationTest {

        public static void main(String[] args) throws Exception {

                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(4);

                DataStream<String> text = env.socketTextStream("localhost", 
9999).broadcast();
                DataStream<String> text2 = env.socketTextStream("localhost", 
9998).shuffle();
//              DataStream<String> union = text.union(text2);

                SplitDataStream<String> split = text.union(text2).split(new 
OutputSelector<String>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public Iterable<String> select(String value) {
                                if (value.startsWith("a")) {
                                        System.out.println("SELECT TO A");
                                        return Collections.singleton("a");
                                } else if (value.startsWith("b")) {
                                        System.out.println("SELECT TO B");
                                        return Collections.singleton("b");
                                } else {
                                        System.out.println("SELECT TO C");
                                        return Collections.singleton("c");
                                }
                        }
                });


                split.select("b").map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public String map(String value) throws Exception {
                                System.out.println("MAP B: " + value);
                                return value;
                        }
                }).print();

                split.select("c").map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public String map(String value) throws Exception {
                                System.out.println("MAP C: " + value);
                                return value;
                        }
                }).print();

                IterativeDataStream<Tuple2<String, Integer>> iteration = 
split.select("a").map(new MapFunction<String, String>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public String map(String value) throws Exception {
                                System.out.println("MAP A: " + value);
                                return value;
                        }
                })
                                .map(new MapFunction<String, Tuple2<String, 
Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public Tuple2<String, Integer> 
map(String value) throws Exception {
                                                return new Tuple2<String, 
Integer>(value, 1);
                                        }
                                }).iterate();

                IterativeDataStream.ConnectedIterativeDataStream<Tuple2<String, 
Integer>, Tuple2<String, Integer>> coIter = iteration.<Tuple2<String, 
Integer>>withFeedbackType("Tuple2<String, Integer>");

                SingleOutputStreamOperator<Tuple2<String, Integer>, ?> iter1Map 
= coIter
                                .map(new CoMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public Tuple2<String, Integer> 
map1(Tuple2<String, Integer> value) throws Exception {
                                                System.out.println("INITIAL 1" 
+ value);
                                                value.f1++;
                                                return value;
                                        }

                                        @Override
                                        public Tuple2<String, Integer> 
map2(Tuple2<String, Integer> value) throws Exception {
                                                System.out.println("FEEDBACK 1" 
 +value);
                                                value.f1++;
                                                return value;
                                        }
                                }).setParallelism(4);

                SingleOutputStreamOperator<Tuple2<String, Integer>, ?> iter2Map 
= coIter
                                .map(new CoMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public Tuple2<String, Integer> 
map1(Tuple2<String, Integer> value) throws Exception {
                                                System.out.println("INITIAL 2 " 
+ value);
                                                value.f1++;
                                                return value;
                                        }

                                        @Override
                                        public Tuple2<String, Integer> 
map2(Tuple2<String, Integer> value) throws Exception {
                                                System.out.println("FEEDBACK 2 
"  +value);
                                                value.f1++;
                                                return value;
                                        }
                                }).setParallelism(4);

                SingleOutputStreamOperator<Tuple2<String, Integer>, ? extends 
SingleOutputStreamOperator<Tuple2<String, Integer>, ?>> map1 = iter1Map.map(
                                new MapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public Tuple2<String, Integer> 
map(Tuple2<String, Integer> value) throws Exception {
                                                return value;
                                        }
                                }).broadcast();

                SingleOutputStreamOperator<Tuple2<String, Integer>, ? extends 
SingleOutputStreamOperator<Tuple2<String, Integer>, ?>> map2 = iter2Map.map(
                                new MapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>() {
                                        private static final long 
serialVersionUID = 1L;

                                        @Override
                                        public Tuple2<String, Integer> 
map(Tuple2<String, Integer> value) throws Exception {
                                                return value;
                                        }
                                }).rebalance();


                SplitDataStream<Tuple2<String, Integer>> iter1Split = 
map1.split(new OutputSelector<Tuple2<String, Integer>>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public Iterable<String> select(Tuple2<String, Integer> 
value) {
                                if (value.f1 < 10) {
                                        return Collections.singleton("loop");
                                } else {
                                        return Collections.singleton("end");
                                }
                        }
                });

                SplitDataStream<Tuple2<String, Integer>> iter2Split = 
map2.split(new OutputSelector<Tuple2<String, Integer>>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public Iterable<String> select(Tuple2<String, Integer> 
value) {
                                if (value.f1 < 10) {
                                        return Collections.singleton("loop");
                                } else {
                                        return Collections.singleton("end");
                                }
                        }
                });

                coIter.closeWith(iter1Split.select("loop"));
//              coIter.closeWith(iter2Split.select("loop"));

                iter1Split.select("end").print();
                iter2Split.select("end").print();

                System.out.println(env.getExecutionPlan());
                env.execute("IterationTest");
        }

}
{code}

> Stream Iteration can Hang after some Data
> -----------------------------------------
>
>                 Key: FLINK-2470
>                 URL: https://issues.apache.org/jira/browse/FLINK-2470
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>
> I was trying out a (rather contrieved) Co-Iteration example job and at some 
> point the elements are not emitted fed back (or emitted to the sink) anymore.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to