[
https://issues.apache.org/jira/browse/FLINK-30933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhipeng Zhang updated FLINK-30933:
----------------------------------
Description:
Currently if we execute a join inside an iteration body, the following program
produces empty output. (In which the right result should be a list with \{2}.
{code:java}
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<Long, Integer>> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));
DataStream<Tuple2<Long, Long>> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));
DataStream<Tuple2<Long, Long>> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(
IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())
.get(0);
DataStream<Long> left = iterationJoin.map(x -> x.f0);
DataStream<Long> right = iterationJoin.map(x -> x.f1);
DataStream<Long> result =
left.join(right)
.where(x -> x)
.equalTo(x -> x)
.window(EndOfStreamWindows.get())
.apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1);
List<Long> collectedResult =
IteratorUtils.toList(result.executeAndCollect());
List<Long> expectedResult = Arrays.asList(2L);
compareResultCollections(expectedResult, collectedResult,
Long::compareTo);
}
private static class MyIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);
DataStream<Long> terminationCriteria = input1.flatMap(new
TerminateOnMaxIter(1));
DataStream<Tuple2<Long, Long>> res =
input1.join(input2)
.where(x -> x.f0)
.equalTo(x -> x.f0)
.window(EndOfStreamWindows.get())
.apply(
new JoinFunction<
Tuple2<Long, Integer>,
Tuple2<Long, Long>,
Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> join(
Tuple2<Long, Integer>
longIntegerTuple2,
Tuple2<Long, Long>
longLongTuple2)
throws Exception {
return longLongTuple2;
}
});
return new IterationBodyResult(
DataStreamList.of(input1), DataStreamList.of(res),
terminationCriteria);
}
}
}
{code}
There are two possible reasons:
* The timer in `HeadOperator` is not a daemon process and it does not exit
even flink job finishes.
* The max watermark from the iteration body is missed.
was:
Currently if we execute a join inside an iteration body, the following program
produces empty output. (In which the right result should be a list with \{1, 2}.
{code:java}
public class Test {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000000L);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);
DataStream<Tuple2<Long, Integer>> input1 =
env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));
DataStream<Tuple2<Long, Long>> input2 =
env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));
DataStream<Tuple2<Long, Long>> iterationJoin =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input1),
ReplayableDataStreamList.replay(input2),
IterationConfig.newBuilder()
.setOperatorLifeCycle(
IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
new MyIterationBody())
.get(0);
DataStream<Long> left = iterationJoin.map(x -> x.f0);
DataStream<Long> right = iterationJoin.map(x -> x.f0);
DataStream<Long> result =
left.join(right)
.where(x -> x)
.equalTo(x -> x)
.window(EndOfStreamWindows.get())
.apply((JoinFunction<Long, Long, Long>) (l1, l2) -> l1);
List<Long> collectedResult =
IteratorUtils.toList(result.executeAndCollect());
List<Long> expectedResult = Arrays.asList(1L, 2L);
compareResultCollections(expectedResult, collectedResult,
Long::compareTo);
}
private static class MyIterationBody implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);
DataStream<Long> terminationCriteria = input1.flatMap(new
TerminateOnMaxIter(1));
DataStream<Tuple2<Long, Long>> res =
input1.join(input2)
.where(x -> x.f0)
.equalTo(x -> x.f0)
.window(EndOfStreamWindows.get())
.apply(
(JoinFunction<
Tuple2<Long, Integer>,
Tuple2<Long, Long>,
Tuple2<Long, Long>>)
(t1, t2) -> t2);
return new IterationBodyResult(
DataStreamList.of(input1), DataStreamList.of(res),
terminationCriteria);
}
}
}
{code}
There are two possible reasons:
* The timer in `HeadOperator` is not a daemon process and it does not exit
even flink job finishes.
* The max watermark from the iteration body is missed.
> Result of join inside iterationBody loses max watermark
> -------------------------------------------------------
>
> Key: FLINK-30933
> URL: https://issues.apache.org/jira/browse/FLINK-30933
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
> Reporter: Zhipeng Zhang
> Assignee: Zhipeng Zhang
> Priority: Major
> Fix For: ml-2.2.0
>
>
> Currently if we execute a join inside an iteration body, the following
> program produces empty output. (In which the right result should be a list
> with \{2}.
> {code:java}
> public class Test {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream<Tuple2<Long, Integer>> input1 =
> env.fromElements(Tuple2.of(1L, 1), Tuple2.of(2L, 2));
> DataStream<Tuple2<Long, Long>> input2 =
> env.fromElements(Tuple2.of(1L, 2L), Tuple2.of(2L, 3L));
> DataStream<Tuple2<Long, Long>> iterationJoin =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(input1),
> ReplayableDataStreamList.replay(input2),
> IterationConfig.newBuilder()
> .setOperatorLifeCycle(
>
> IterationConfig.OperatorLifeCycle.PER_ROUND)
> .build(),
> new MyIterationBody())
> .get(0);
> DataStream<Long> left = iterationJoin.map(x -> x.f0);
> DataStream<Long> right = iterationJoin.map(x -> x.f1);
> DataStream<Long> result =
> left.join(right)
> .where(x -> x)
> .equalTo(x -> x)
> .window(EndOfStreamWindows.get())
> .apply((JoinFunction<Long, Long, Long>) (l1, l2) ->
> l1);
> List<Long> collectedResult =
> IteratorUtils.toList(result.executeAndCollect());
> List<Long> expectedResult = Arrays.asList(2L);
> compareResultCollections(expectedResult, collectedResult,
> Long::compareTo);
> }
> private static class MyIterationBody implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStream<Tuple2<Long, Integer>> input1 = variableStreams.get(0);
> DataStream<Tuple2<Long, Long>> input2 = dataStreams.get(0);
> DataStream<Long> terminationCriteria = input1.flatMap(new
> TerminateOnMaxIter(1));
> DataStream<Tuple2<Long, Long>> res =
> input1.join(input2)
> .where(x -> x.f0)
> .equalTo(x -> x.f0)
> .window(EndOfStreamWindows.get())
> .apply(
> new JoinFunction<
> Tuple2<Long, Integer>,
> Tuple2<Long, Long>,
> Tuple2<Long, Long>>() {
> @Override
> public Tuple2<Long, Long> join(
> Tuple2<Long, Integer>
> longIntegerTuple2,
> Tuple2<Long, Long>
> longLongTuple2)
> throws Exception {
> return longLongTuple2;
> }
> });
> return new IterationBodyResult(
> DataStreamList.of(input1), DataStreamList.of(res),
> terminationCriteria);
> }
> }
> }
> {code}
>
> There are two possible reasons:
> * The timer in `HeadOperator` is not a daemon process and it does not exit
> even flink job finishes.
> * The max watermark from the iteration body is missed.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)