This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push: new 7025a3f1ec1 [FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27015) 7025a3f1ec1 is described below commit 7025a3f1ec19b8ca9b537d2dce1805a2624c5953 Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Sep 22 09:54:28 2025 +0200 [FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27015) In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines. --- .../runtime/operators/sink/CommitterOperator.java | 2 +- .../sink/SinkV2CommitterOperatorTest.java | 45 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 4a640fedf75..7fecc8abffe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -151,7 +151,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage public void endInput() throws Exception { if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); + commitAndEmitCheckpoints(Long.MAX_VALUE); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java index 03a0c791b3a..0025acdf2f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java @@ -36,11 +36,14 @@ import org.assertj.core.api.AbstractThrowableAssert; import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.Collection; import java.util.function.IntSupplier; +import java.util.stream.Stream; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; @@ -85,9 +88,17 @@ class SinkV2CommitterOperatorTest { () -> committer.successfulCommits); } + static Stream<Arguments> testParameters() { + return Stream.of( + Arguments.of(true, false), + Arguments.of(true, true), + Arguments.of(false, false), + Arguments.of(false, true)); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitCommittables(boolean withPostCommitTopology) throws Exception { + @MethodSource("testParameters") + void testEmitCommittables(boolean withPostCommitTopology, boolean isBatch) throws Exception { SinkAndCounters sinkAndCounters; if (withPostCommitTopology) { // Insert global committer to simulate post commit topology @@ -99,7 +110,8 @@ class SinkV2CommitterOperatorTest { CommittableMessage<String>, CommittableMessage<String>> testHarness = new OneInputStreamOperatorTestHarness<>( - new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true)); + new CommitterOperatorFactory<>( + sinkAndCounters.sink, isBatch, true)); testHarness.open(); final CommittableSummary<String> committableSummary = @@ -127,6 +139,33 @@ class SinkV2CommitterOperatorTest { testHarness.close(); } + @Test + void testEmitCommittablesBatch() throws Exception { + SinkAndCounters sinkAndCounters = sinkWithoutPostCommit(); + final OneInputStreamOperatorTestHarness< + CommittableMessage<String>, CommittableMessage<String>> + testHarness = + new OneInputStreamOperatorTestHarness<>( + new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false)); + testHarness.open(); + + // Test that all committables up to Long.MAX_VALUE are committed. + long checkpointId = Long.MAX_VALUE; + final CommittableSummary<String> committableSummary = + new CommittableSummary<>(1, 1, checkpointId, 1, 0); + testHarness.processElement(new StreamRecord<>(committableSummary)); + final CommittableWithLineage<String> committableWithLineage = + new CommittableWithLineage<>("1", checkpointId, 1); + testHarness.processElement(new StreamRecord<>(committableWithLineage)); + + testHarness.endInput(); + + assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.close(); + } + @Test void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { SinkAndCounters sinkAndCounters = sinkWithPostCommit();