[ https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhipeng Zhang reassigned FLINK-30933: ------------------------------------- Assignee: Zhipeng Zhang > Result of join inside iterationBody loses max watermark > ------------------------------------------------------- > > Key: FLINK-30933 > URL: https://issues.apache.org/jira/browse/FLINK-30933 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning > Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 > Reporter: Zhipeng Zhang > Assignee: Zhipeng Zhang > Priority: Major > Fix For: ml-2.2.0 > > > Currently if we execute a join inside an iteration body, the following > program produces empty output. (In which the right result should be a list > with \{1, 2}. > {code:java} > public class Test { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000000L); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.setParallelism(1); > DataStream<Tuple2<Long, Integer>> input1 = > env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); > DataStream<Tuple2<Long, Long>> input2 = > env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); > DataStream<Tuple2<Long, Long>> iterationJoin = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input1), > ReplayableDataStreamList.replay(input2), > IterationConfig.newBuilder() > .setOperatorLifeCycle( > > IterationConfig.OperatorLifeCycle.PER_ROUND) > .build(), > new MyIterationBody()) > .get(0); > DataStream<Long> left = iterationJoin.map(x -> x.f0); > DataStream<Long> right = iterationJoin.map(x -> x.f0); > DataStream<Long> result = > left.join(right) > .where(x -> x) > .equalTo(x -> x) > .window(EndOfStreamWindows.get()) > .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> > l1); > List<Long> collectedResult = > IteratorUtils.toList(result.executeAndCollect()); > List<Long> expectedResult = Arrays.asList(1L, 2L); > compareResultCollections(expectedResult, collectedResult, > Long::compareTo); > } > private static class MyIterationBody implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0); > DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0); > DataStream<Long> terminationCriteria = input1.flatMap(new > TerminateOnMaxIter(1)); > DataStream<Tuple2<Long, Long>> res = > input1.join(input2) > .where(x -> x.f0) > .equalTo(x -> x.f0) > .window(EndOfStreamWindows.get()) > .apply( > (JoinFunction< > Tuple2<Long, Integer>, > Tuple2<Long, Long>, > Tuple2<Long, Long>>) > (t1, t2) -> t2); > return new IterationBodyResult( > DataStreamList.of(input1), DataStreamList.of(res), > terminationCriteria); > } > } > } > {code} > > There are two possible reasons: > * The timer in `HeadOperator` is not a daemon process and it does not exit > even flink job finishes. > * The max watermark from the iteration body is missed. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)