Zhipeng Zhang created FLINK-30933: ------------------------------------- Summary: Result of join inside iterationBody loses max watermark Key: FLINK-30933 URL: https://issues.apache.org/jira/browse/FLINK-30933 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang Fix For: ml-2.2.0
Currently if we execute a join inside an iteration body, the following program produces empty output. (In which the right result should be a list with \{1, 2}. {code:java} public class Test { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000000L); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setParallelism(1); DataStream<Tuple2<Long, Integer>> input1 = env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2)); DataStream<Tuple2<Long, Long>> input2 = env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L)); DataStream<Tuple2<Long, Long>> iterationJoin = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input1), ReplayableDataStreamList.replay(input2), IterationConfig.newBuilder() .setOperatorLifeCycle( IterationConfig.OperatorLifeCycle.PER_ROUND) .build(), new MyIterationBody()) .get(0); DataStream<Long> left = iterationJoin.map(x -> x.f0); DataStream<Long> right = iterationJoin.map(x -> x.f0); DataStream<Long> result = left.join(right) .where(x -> x) .equalTo(x -> x) .window(EndOfStreamWindows.get()) .apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1); List<Long> collectedResult = IteratorUtils.toList(result.executeAndCollect()); List<Long> expectedResult = Arrays.asList(1L, 2L); compareResultCollections(expectedResult, collectedResult, Long::compareTo); } private static class MyIterationBody implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0); DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0); DataStream<Long> terminationCriteria = input1.flatMap(new TerminateOnMaxIter(1)); DataStream<Tuple2<Long, Long>> res = input1.join(input2) .where(x -> x.f0) .equalTo(x -> x.f0) .window(EndOfStreamWindows.get()) .apply( (JoinFunction< Tuple2<Long, Integer>, Tuple2<Long, Long>, Tuple2<Long, Long>>) (t1, t2) -> t2); return new IterationBodyResult( DataStreamList.of(input1), DataStreamList.of(res), terminationCriteria); } } } {code} There are two possible reasons: * The timer in `HeadOperator` is not a daemon process and it does not exit even flink job finishes. * The max watermark from the iteration body is missed. -- This message was sent by Atlassian Jira (v8.20.10#820010)