Grzegorz Liter created FLINK-39159:
--------------------------------------

             Summary: Using KeyedProcessFunction in batch mode reads events out 
of order
                 Key: FLINK-39159
                 URL: https://issues.apache.org/jira/browse/FLINK-39159
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 2.2.0
            Reporter: Grzegorz Liter


When using KeyedProcessFunction in batch mode events from previous step are 
read not in order in scope of single key.

All tests done on sorted input (ints 0-100), parallelism 1

Following examples are not working:
 # source -> keyedProcessFunction -> sink in BATCH mode
 # source -> keyBy + fullPartitionWindow + sortPartition -> keyBy + 
keyedProcessFunction -> sink in BATCH mode
 # source -> keyBy + fullPartitionWindow + sortPartition -> 
reinterpretAsKeyedStream -> keyedProcessFunction -> sink in BATCH mode

Following examples do work:
 # all of the above in STREAMING mode
 # source -> map -> sink in BATCH mode

 

Code snippets:
{code:java}
@Test
void keyByBatchFail() throws Exception {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    var result = env.fromData(IntStream.range(0, 100).boxed().toList())
            .keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
            .process(new KeyedProcessFunction<Integer, Integer, Integer>() {
                @Override
                public void processElement(Integer value, 
KeyedProcessFunction<Integer, Integer, Integer>.Context ctx, Collector<Integer> 
out) throws Exception {
                    System.out.println("Process " + value + " from subtask " + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
                    out.collect(value);
                }
            })
            .executeAndCollect(100);

    assertBucketsSorted(result);
}

@Test
void keyByStreamingPass() throws Exception {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

    var result = env.fromData(IntStream.range(0, 100).boxed().toList())
            .keyBy((KeySelector<Integer, Integer>) integer -> integer % 10)
            .process(new KeyedProcessFunction<Integer, Integer, Integer>() {
                @Override
                public void processElement(Integer value, 
KeyedProcessFunction<Integer, Integer, Integer>.Context ctx, Collector<Integer> 
out) throws Exception {
                    System.out.println("Process " + value + " from subtask " + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
                    out.collect(value);
                }
            })
            .executeAndCollect(100);

    assertBucketsSorted(result);
}

@Test
void noKeyByPass() throws Exception {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    var result = env.fromData(IntStream.range(0, 100).boxed().toList())
            .process(new ProcessFunction<Integer, Integer>() {
                @Override
                public void processElement(Integer value, 
ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws 
Exception {
                    System.out.println("Process " + value + " from subtask " + 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
                    out.collect(value);
                }
            })
            .executeAndCollect(100);

    assertBucketsSorted(result);
}

private void assertBucketsSorted(List<Integer> result) {
    var buckets = result.stream()
            .collect(Collectors.groupingBy(i -> i % 10));
    buckets.forEach((bucket, values) -> {
        var isSorted = IntStream.range(0, values.size() - 1)
                .allMatch(i -> values.get(i) <= values.get(i + 1));
        assertThat(isSorted).as("bucket %d [%s] should be sorted", bucket, 
values).isTrue();
    });
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to