Grzegorz Liter created FLINK-39159:
--------------------------------------
Summary: Using KeyedProcessFunction in batch mode reads events out
of order
Key: FLINK-39159
URL: https://issues.apache.org/jira/browse/FLINK-39159
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 2.2.0
Reporter: Grzegorz Liter
When using KeyedProcessFunction in batch mode events from previous step are
read not in order in scope of single key.
All tests done on sorted input (ints 0-100), parallelism 1
Following examples are not working:
# source -> keyedProcessFunction -> sink in BATCH mode
# source -> keyBy + fullPartitionWindow + sortPartition -> keyBy +
keyedProcessFunction -> sink in BATCH mode
# source -> keyBy + fullPartitionWindow + sortPartition ->
reinterpretAsKeyedStream -> keyedProcessFunction -> sink in BATCH mode
Following examples do work:
# all of the above in STREAMING mode
# source -> map -> sink in BATCH mode
Code snippets:
{code:java}
@Test
void keyByBatchFail() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
var result = env.fromData(IntStream.range(0, 100).boxed().toList())
.keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
.process(new KeyedProcessFunction<Integer, Integer, Integer>() {
@Override
public void processElement(Integer value,
KeyedProcessFunction<Integer, Integer, Integer>.Context ctx, Collector<Integer>
out) throws Exception {
System.out.println("Process " + value + " from subtask " +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
out.collect(value);
}
})
.executeAndCollect(100);
assertBucketsSorted(result);
}
@Test
void keyByStreamingPass() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
var result = env.fromData(IntStream.range(0, 100).boxed().toList())
.keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
.process(new KeyedProcessFunction<Integer, Integer, Integer>() {
@Override
public void processElement(Integer value,
KeyedProcessFunction<Integer, Integer, Integer>.Context ctx, Collector<Integer>
out) throws Exception {
System.out.println("Process " + value + " from subtask " +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
out.collect(value);
}
})
.executeAndCollect(100);
assertBucketsSorted(result);
}
@Test
void noKeyByPass() throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
var result = env.fromData(IntStream.range(0, 100).boxed().toList())
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value,
ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws
Exception {
System.out.println("Process " + value + " from subtask " +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
out.collect(value);
}
})
.executeAndCollect(100);
assertBucketsSorted(result);
}
private void assertBucketsSorted(List<Integer> result) {
var buckets = result.stream()
.collect(Collectors.groupingBy(i -> i % 10));
buckets.forEach((bucket, values) -> {
var isSorted = IntStream.range(0, values.size() - 1)
.allMatch(i -> values.get(i) <= values.get(i + 1));
assertThat(isSorted).as("bucket %d [%s] should be sorted", bucket,
values).isTrue();
});
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)