[ https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhipeng Zhang updated FLINK-30933: ---------------------------------- Description: Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{2}. {code:java} public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<Long, Integer>> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream<Tuple2<Long, Long>> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream<Tuple2<Long, Long>> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody()) .get(0); DataStream<Long> left = iterationJoin.map(x -> x.f0); DataStream<Long> right = iterationJoin.map(x -> x.f1); DataStream<Long> result = left.join(right) .where(x -> x) .equalTo(x -> x) .window(EndOfStreamWindows.get()) .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1); List<Long> collectedResult = IteratorUtils.toList(result.executeAndCollect()); List<Long> expectedResult = Arrays.asList(2L); compareResultCollections(expectedResult, collectedResult, Long::compareTo); } private static class MyIterationBody implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0); DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0); DataStream<Long> terminationCriteria = input1.flatMap(new TerminateOnMaxIter(1)); DataStream<Tuple2<Long, Long>> res = input1.join(input2) .where(x -> x.f0) .equalTo(x -> x.f0) .window(EndOfStreamWindows.get()) .apply( new JoinFunction< Tuple2<Long, Integer>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> join( Tuple2<Long, Integer> longIntegerTuple2, Tuple2<Long, Long> longLongTuple2) throws Exception { return longLongTuple2; } }); return new IterationBodyResult( DataStreamList.of(input1), DataStreamList.of(res), terminationCriteria); } } } {code} There are two possible reasons: * The timer in `HeadOperator` is not a daemon process and it does not exit even flink job finishes. * The max watermark from the iteration body is missed. was: Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{1, 2}. {code:java} public class Test { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000000L); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); DataStream<Tuple2<Long, Integer>> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream<Tuple2<Long, Long>> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream<Tuple2<Long, Long>> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody()) .get(0); DataStream<Long> left = iterationJoin.map(x -> x.f0); DataStream<Long> right = iterationJoin.map(x -> x.f0); DataStream<Long> result = left.join(right) .where(x -> x) .equalTo(x -> x) .window(EndOfStreamWindows.get()) .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1); List<Long> collectedResult = IteratorUtils.toList(result.executeAndCollect()); List<Long> expectedResult = Arrays.asList(1L, 2L); compareResultCollections(expectedResult, collectedResult, Long::compareTo); } private static class MyIterationBody implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0); DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0); DataStream<Long> terminationCriteria = input1.flatMap(new TerminateOnMaxIter(1)); DataStream<Tuple2<Long, Long>> res = input1.join(input2) .where(x -> x.f0) .equalTo(x -> x.f0) .window(EndOfStreamWindows.get()) .apply( (JoinFunction< Tuple2<Long, Integer>, Tuple2<Long, Long>, Tuple2<Long, Long>>) (t1, t2) -> t2); return new IterationBodyResult( DataStreamList.of(input1), DataStreamList.of(res), terminationCriteria); } } } {code} There are two possible reasons: * The timer in `HeadOperator` is not a daemon process and it does not exit even flink job finishes. * The max watermark from the iteration body is missed. > Result of join inside iterationBody loses max watermark > ------------------------------------------------------- > > Key: FLINK-30933 > URL: https://issues.apache.org/jira/browse/FLINK-30933 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning > Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 > Reporter: Zhipeng Zhang > Assignee: Zhipeng Zhang > Priority: Major > Fix For: ml-2.2.0 > > > Currently if we execute a join inside an iteration body, the following > program produces empty output. (In which the right result should be a list > with \{2}. > {code:java} > public class Test { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataStream<Tuple2<Long, Integer>> input1 = > env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); > DataStream<Tuple2<Long, Long>> input2 = > env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); > DataStream<Tuple2<Long, Long>> iterationJoin = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input1), > ReplayableDataStreamList.replay(input2), > IterationConfig.newBuilder() > .setOperatorLifeCycle( > > IterationConfig.OperatorLifeCycle.PER_ROUND) > .build(), > new MyIterationBody()) > .get(0); > DataStream<Long> left = iterationJoin.map(x -> x.f0); > DataStream<Long> right = iterationJoin.map(x -> x.f1); > DataStream<Long> result = > left.join(right) > .where(x -> x) > .equalTo(x -> x) > .window(EndOfStreamWindows.get()) > .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> > l1); > List<Long> collectedResult = > IteratorUtils.toList(result.executeAndCollect()); > List<Long> expectedResult = Arrays.asList(2L); > compareResultCollections(expectedResult, collectedResult, > Long::compareTo); > } > private static class MyIterationBody implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0); > DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0); > DataStream<Long> terminationCriteria = input1.flatMap(new > TerminateOnMaxIter(1)); > DataStream<Tuple2<Long, Long>> res = > input1.join(input2) > .where(x -> x.f0) > .equalTo(x -> x.f0) > .window(EndOfStreamWindows.get()) > .apply( > new JoinFunction< > Tuple2<Long, Integer>, > Tuple2<Long, Long>, > Tuple2<Long, Long>>() { > @Override > public Tuple2<Long, Long> join( > Tuple2<Long, Integer> > longIntegerTuple2, > Tuple2<Long, Long> > longLongTuple2) > throws Exception { > return longLongTuple2; > } > }); > return new IterationBodyResult( > DataStreamList.of(input1), DataStreamList.of(res), > terminationCriteria); > } > } > } > {code} > > There are two possible reasons: > * The timer in `HeadOperator` is not a daemon process and it does not exit > even flink job finishes. > * The max watermark from the iteration body is missed. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)