This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 691b8f9b8a1 [release-1.20][FLINK-38370] Ensure CommitterOperator 
commits all pending committables in batch mode (#27013)
691b8f9b8a1 is described below

commit 691b8f9b8a182eeb29fb1306afb350d4b5995e31
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Fri Sep 19 14:39:14 2025 +0200

    [release-1.20][FLINK-38370] Ensure CommitterOperator commits all pending 
committables in batch mode (#27013)
    
    In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the 
checkpoint id. Since
    streaming pipelines can continue to checkpoint even after their respective 
operators have been shut
    down, it is not safe to use a constant as this can lead to duplicate 
commits.
    
    However, in batch pipelines we only have one commit on job shutdown. Using 
any checkpoint id should
    suffice in this scenario. Any pending committables should be processed by 
the ComitterOperator when
    the operator shuts down. No further checkpoints will take place.
    
    There are various connectors which rely on this behavior. I don't see any 
drawbacks from keeping
    this behavior for batch pipelines.
---
 .../runtime/operators/sink/CommitterOperator.java  |  2 +-
 .../operators/sink/CommitterOperatorTestBase.java  | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 6954ad24e36..2f766a341cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -148,7 +148,7 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     public void endInput() throws Exception {
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
+            commitAndEmitCheckpoints(Long.MAX_VALUE);
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index c8b37943846..ee58c8b94ac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -276,6 +276,33 @@ abstract class CommitterOperatorTestBase {
         assertThat(testHarness.getOutput()).hasSize(2);
     }
 
+    @Test
+    void testEmitCommittablesBatch() throws Exception {
+        SinkAndCounters sinkAndCounters = sinkWithoutPostCommit();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, true, false));
+        testHarness.open();
+
+        // Test that all committables up to Long.MAX_VALUE are committed.
+        long checkpointId = Long.MAX_VALUE;
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, checkpointId, 1, 0, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> committableWithLineage =
+                new CommittableWithLineage<>("1", checkpointId, 1);
+        testHarness.processElement(new StreamRecord<>(committableWithLineage));
+
+        testHarness.endInput();
+
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
+        assertThat(testHarness.getOutput()).isEmpty();
+
+        testHarness.close();
+    }
+
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(

Reply via email to