[ 
https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-30933:
----------------------------------
    Description: 
Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{2}.
{code:java}
public class Test {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<Long, Integer>> input1 =
                env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

        DataStream<Tuple2<Long, Long>> input2 =
                env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

        DataStream<Tuple2<Long, Long>> iterationJoin =
                Iterations.iterateBoundedStreamsUntilTermination(
                                DataStreamList.of(input1),
                                ReplayableDataStreamList.replay(input2),
                                IterationConfig.newBuilder()
                                        .setOperatorLifeCycle(
                                                
IterationConfig.OperatorLifeCycle.PER_ROUND)
                                        .build(),
                                new MyIterationBody())
                        .get(0);

        DataStream<Long> left = iterationJoin.map(x -> x.f0);
        DataStream<Long> right = iterationJoin.map(x -> x.f1);
        DataStream<Long> result =
                left.join(right)
                        .where(x -> x)
                        .equalTo(x -> x)
                        .window(EndOfStreamWindows.get())
                        .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1);

        List<Long> collectedResult = 
IteratorUtils.toList(result.executeAndCollect());
        List<Long> expectedResult = Arrays.asList(2L);
        compareResultCollections(expectedResult, collectedResult, 
Long::compareTo);
    }

    private static class MyIterationBody implements IterationBody {
        @Override
        public IterationBodyResult process(
                DataStreamList variableStreams, DataStreamList dataStreams) {
            DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
            DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);

            DataStream<Long> terminationCriteria = input1.flatMap(new 
TerminateOnMaxIter(1));

            DataStream<Tuple2<Long, Long>> res =
                    input1.join(input2)
                            .where(x -> x.f0)
                            .equalTo(x -> x.f0)
                            .window(EndOfStreamWindows.get())
                            .apply(
                                    new JoinFunction<
                                            Tuple2<Long, Integer>,
                                            Tuple2<Long, Long>,
                                            Tuple2<Long, Long>>() {
                                        @Override
                                        public Tuple2<Long, Long> join(
                                                Tuple2<Long, Integer> 
longIntegerTuple2,
                                                Tuple2<Long, Long> 
longLongTuple2)
                                                throws Exception {
                                            return longLongTuple2;
                                        }
                                    });

            return new IterationBodyResult(
                    DataStreamList.of(input1), DataStreamList.of(res), 
terminationCriteria);
        }
    }
}
{code}
 

There are two possible reasons:
 * The timer in `HeadOperator` is not a daemon process and it does not exit 
even flink job finishes.
 * The max watermark from the iteration body is missed.

 

 

  was:
Currently if we execute a join inside an iteration body, the following program 
produces empty output. (In which the right result should be a list with \{1, 2}.
{code:java}
public class Test {

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000000L);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
        env.setParallelism(1);

        DataStream<Tuple2<Long, Integer>> input1 =
                env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));

        DataStream<Tuple2<Long, Long>> input2 =
                env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));

        DataStream<Tuple2<Long, Long>> iterationJoin =
                Iterations.iterateBoundedStreamsUntilTermination(
                                DataStreamList.of(input1),
                                ReplayableDataStreamList.replay(input2),
                                IterationConfig.newBuilder()
                                        .setOperatorLifeCycle(
                                                
IterationConfig.OperatorLifeCycle.PER_ROUND)
                                        .build(),
                                new MyIterationBody())
                        .get(0);

        DataStream<Long> left = iterationJoin.map(x -> x.f0);
        DataStream<Long> right = iterationJoin.map(x -> x.f0);
        DataStream<Long> result =
                left.join(right)
                        .where(x -> x)
                        .equalTo(x -> x)
                        .window(EndOfStreamWindows.get())
                        .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1);

        List<Long> collectedResult = 
IteratorUtils.toList(result.executeAndCollect());
        List<Long> expectedResult = Arrays.asList(1L, 2L);
        compareResultCollections(expectedResult, collectedResult, 
Long::compareTo);
    }

    private static class MyIterationBody implements IterationBody {
        @Override
        public IterationBodyResult process(
                DataStreamList variableStreams, DataStreamList dataStreams) {
            DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
            DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);

            DataStream<Long> terminationCriteria = input1.flatMap(new 
TerminateOnMaxIter(1));

            DataStream<Tuple2<Long, Long>> res =
                    input1.join(input2)
                            .where(x -> x.f0)
                            .equalTo(x -> x.f0)
                            .window(EndOfStreamWindows.get())
                            .apply(
                                    (JoinFunction<
                                                    Tuple2<Long, Integer>,
                                                    Tuple2<Long, Long>,
                                                    Tuple2<Long, Long>>)
                                            (t1, t2) -> t2);

            return new IterationBodyResult(
                    DataStreamList.of(input1), DataStreamList.of(res), 
terminationCriteria);
        }
    }
}
 {code}
 

There are two possible reasons:
 * The timer in `HeadOperator` is not a daemon process and it does not exit 
even flink job finishes.
 * The max watermark from the iteration body is missed.

 

 


> Result of join inside iterationBody loses max watermark
> -------------------------------------------------------
>
>                 Key: FLINK-30933
>                 URL: https://issues.apache.org/jira/browse/FLINK-30933
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>            Reporter: Zhipeng Zhang
>            Assignee: Zhipeng Zhang
>            Priority: Major
>             Fix For: ml-2.2.0
>
>
> Currently if we execute a join inside an iteration body, the following 
> program produces empty output. (In which the right result should be a list 
> with \{2}.
> {code:java}
> public class Test {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         DataStream<Tuple2<Long, Integer>> input1 =
>                 env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));
>         DataStream<Tuple2<Long, Long>> input2 =
>                 env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));
>         DataStream<Tuple2<Long, Long>> iterationJoin =
>                 Iterations.iterateBoundedStreamsUntilTermination(
>                                 DataStreamList.of(input1),
>                                 ReplayableDataStreamList.replay(input2),
>                                 IterationConfig.newBuilder()
>                                         .setOperatorLifeCycle(
>                                                 
> IterationConfig.OperatorLifeCycle.PER_ROUND)
>                                         .build(),
>                                 new MyIterationBody())
>                         .get(0);
>         DataStream<Long> left = iterationJoin.map(x -> x.f0);
>         DataStream<Long> right = iterationJoin.map(x -> x.f1);
>         DataStream<Long> result =
>                 left.join(right)
>                         .where(x -> x)
>                         .equalTo(x -> x)
>                         .window(EndOfStreamWindows.get())
>                         .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> 
> l1);
>         List<Long> collectedResult = 
> IteratorUtils.toList(result.executeAndCollect());
>         List<Long> expectedResult = Arrays.asList(2L);
>         compareResultCollections(expectedResult, collectedResult, 
> Long::compareTo);
>     }
>     private static class MyIterationBody implements IterationBody {
>         @Override
>         public IterationBodyResult process(
>                 DataStreamList variableStreams, DataStreamList dataStreams) {
>             DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
>             DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);
>             DataStream<Long> terminationCriteria = input1.flatMap(new 
> TerminateOnMaxIter(1));
>             DataStream<Tuple2<Long, Long>> res =
>                     input1.join(input2)
>                             .where(x -> x.f0)
>                             .equalTo(x -> x.f0)
>                             .window(EndOfStreamWindows.get())
>                             .apply(
>                                     new JoinFunction<
>                                             Tuple2<Long, Integer>,
>                                             Tuple2<Long, Long>,
>                                             Tuple2<Long, Long>>() {
>                                         @Override
>                                         public Tuple2<Long, Long> join(
>                                                 Tuple2<Long, Integer> 
> longIntegerTuple2,
>                                                 Tuple2<Long, Long> 
> longLongTuple2)
>                                                 throws Exception {
>                                             return longLongTuple2;
>                                         }
>                                     });
>             return new IterationBodyResult(
>                     DataStreamList.of(input1), DataStreamList.of(res), 
> terminationCriteria);
>         }
>     }
> }
> {code}
>  
> There are two possible reasons:
>  * The timer in `HeadOperator` is not a daemon process and it does not exit 
> even flink job finishes.
>  * The max watermark from the iteration body is missed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to