Xuannan Su created FLINK-28126:
----------------------------------
Summary: Iteration gets stuck when replayable datastream and its
downstream operator have different parallelism
Key: FLINK-28126
URL: https://issues.apache.org/jira/browse/FLINK-28126
Project: Flink
Issue Type: Bug
Components: Library / Machine Learning
Affects Versions: ml-2.0.0
Reporter: Xuannan Su
Iteration gets stuck when replayable datastream and its downstream operator
have different parallelism. It can be reproduced with the following code
snippet.
{code:java}
@Test
public void testIteration() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final SingleOutputStreamOperator<Integer> variable =
env.fromElements(0).name("i");
final SingleOutputStreamOperator<Integer> data = env.fromElements(1,
2).name("inc")
.map(x -> x).setParallelism(1); // test can pass if parallelism
is 2.
final IterationConfig config = IterationConfig.newBuilder().build();
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(variable),
ReplayableDataStreamList.replay(data),
config,
(IterationBody) (variableStreams, dataStreams) -> {
final DataStream<Integer> sample = dataStreams.get(0);
final SingleOutputStreamOperator<Integer> trainOutput =
sample
.transform(
"iter",
TypeInformation.of(Integer.class),
new IterTransform())
.setParallelism(2)
.map((MapFunction<Integer, Integer>)
integer -> integer)
.setParallelism(1);
return new IterationBodyResult(
DataStreamList.of(trainOutput),
DataStreamList.of(trainOutput));
});
env.execute();
}
public static class IterTransform extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Integer, Integer>,
IterationListener<Integer> {
@Override
public void processElement(StreamRecord<Integer> element) throws
Exception {
LOG.info("Processing element: {}", element);
}
@Override
public void onEpochWatermarkIncremented(
int epochWatermark, Context context, Collector<Integer>
collector)
throws Exception {
LOG.info("onEpochWatermarkIncremented: {}", epochWatermark);
if (epochWatermark >= 10) {
return;
}
collector.collect(0);
}
@Override
public void onIterationTerminated(Context context, Collector<Integer>
collector)
throws Exception {
LOG.info("onIterationTerminated");
}
}
{code}
After digging into the code, I found that the `ReplayOperator` doesn't emit the
epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see
if this is the case?
--
This message was sent by Atlassian Jira
(v8.20.7#820007)