This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 691b8f9b8a1 [release-1.20][FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27013) 691b8f9b8a1 is described below commit 691b8f9b8a182eeb29fb1306afb350d4b5995e31 Author: Maximilian Michels <m...@apache.org> AuthorDate: Fri Sep 19 14:39:14 2025 +0200 [release-1.20][FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27013) In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines. --- .../runtime/operators/sink/CommitterOperator.java | 2 +- .../operators/sink/CommitterOperatorTestBase.java | 27 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 6954ad24e36..2f766a341cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -148,7 +148,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index c8b37943846..ee58c8b94ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -276,6 +276,33 @@ abstract class CommitterOperatorTestBase { assertThat(testHarness.getOutput()).hasSize(2); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + private OneInputStreamOperatorTestHarness< CommittableMessage<String>, CommittableMessage<String>> createTestHarness(