[
https://issues.apache.org/jira/browse/FLINK-39159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Grzegorz Liter closed FLINK-39159.
----------------------------------
Resolution: Not A Problem
My fault, it turned out that there is internal reordering done for
KeyedProcessFunction by timestamp.
My example lacks assigning watermark and timestamps.
Still its bit unclear to me from documentation that it works that way and bit
unexpected that KeyedProcessFunction in contrast to other function does not
read data in order it was written by previous stage.
> Using KeyedProcessFunction in batch mode reads events out of order
> ------------------------------------------------------------------
>
> Key: FLINK-39159
> URL: https://issues.apache.org/jira/browse/FLINK-39159
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 2.2.0
> Reporter: Grzegorz Liter
> Priority: Major
>
> When using KeyedProcessFunction in batch mode events from previous step are
> read not in order in scope of single key.
> All tests done on sorted input (ints 0-100), parallelism 1
> Following examples are not working:
> # source -> keyedProcessFunction -> sink in BATCH mode
> # source -> keyBy + fullPartitionWindow + sortPartition -> keyBy +
> keyedProcessFunction -> sink in BATCH mode
> # source -> keyBy + fullPartitionWindow + sortPartition ->
> reinterpretAsKeyedStream -> keyedProcessFunction -> sink in BATCH mode
> Following examples do work:
> # all of the above in STREAMING mode
> # source -> map -> sink in BATCH mode
>
> Code snippets:
> {code:java}
> @Test
> void keyByBatchFail() throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> var result = env.fromData(IntStream.range(0, 100).boxed().toList())
> .keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
> .process(new KeyedProcessFunction<Integer, Integer, Integer>() {
> @Override
> public void processElement(Integer value,
> KeyedProcessFunction<Integer, Integer, Integer>.Context ctx,
> Collector<Integer> out) throws Exception {
> System.out.println("Process " + value + " from subtask "
> + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
> out.collect(value);
> }
> })
> .executeAndCollect(100);
> assertBucketsSorted(result);
> }
> @Test
> void keyByStreamingPass() throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> var result = env.fromData(IntStream.range(0, 100).boxed().toList())
> .keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
> .process(new KeyedProcessFunction<Integer, Integer, Integer>() {
> @Override
> public void processElement(Integer value,
> KeyedProcessFunction<Integer, Integer, Integer>.Context ctx,
> Collector<Integer> out) throws Exception {
> System.out.println("Process " + value + " from subtask "
> + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
> out.collect(value);
> }
> })
> .executeAndCollect(100);
> assertBucketsSorted(result);
> }
> @Test
> void noKeyByPass() throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> var result = env.fromData(IntStream.range(0, 100).boxed().toList())
> .process(new ProcessFunction<Integer, Integer>() {
> @Override
> public void processElement(Integer value,
> ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws
> Exception {
> System.out.println("Process " + value + " from subtask "
> + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
> out.collect(value);
> }
> })
> .executeAndCollect(100);
> assertBucketsSorted(result);
> }
> private void assertBucketsSorted(List<Integer> result) {
> var buckets = result.stream()
> .collect(Collectors.groupingBy(i -> i % 10));
> buckets.forEach((bucket, values) -> {
> var isSorted = IntStream.range(0, values.size() - 1)
> .allMatch(i -> values.get(i) <= values.get(i + 1));
> assertThat(isSorted).as("bucket %d [%s] should be sorted", bucket,
> values).isTrue();
> });
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)