This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d32f1b539cd326ae9d616fb5a8ba57dca860bfd
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Sep 11 10:22:29 2024 +0200

    [FLINK-25920] Only process complete batches of committables
    
    The committer is supposed to commit all committables at once for a given 
subtask (so that it can potentially optimize committables on the fly). With 
UCs, we could potentially see notifyCheckpointCompleted before receiving all 
committables. The CommittableSummary was built and is used to detect that.
    
    So far, we enforced completeness only for the most current committables 
belonging the respective checkpoint being completed. However, we should also 
enforce it to all subsumed committables. In fact, we probably implicitly do it 
but we have the extra code path which allows subsumed committables to be 
incomplete. This commit simplifies the code a bit by always enforcing 
completeness.
---
 .../connector/sink2/GlobalCommitterOperator.java   |  5 ++---
 .../runtime/operators/sink/CommitterOperator.java  | 22 +++++++---------------
 .../CheckpointCommittableManagerImpl.java          |  9 ++++-----
 .../sink/committables/CommittableManager.java      |  8 +++-----
 .../CheckpointCommittableManagerImplTest.java      | 18 +++++++++++-------
 5 files changed, 27 insertions(+), 35 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
index 5776ae43b00..86697081436 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
@@ -175,8 +175,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator
             sinkV1State = globalCommitter.commit(sinkV1State);
         }
         for (CheckpointCommittableManager<CommT> committable : 
getCommittables(checkpointId)) {
-            boolean fullyReceived = committable.getCheckpointId() == 
lastCompletedCheckpointId;
-            committable.commit(fullyReceived, committer);
+            committable.commit(committer);
         }
     }
 
@@ -186,7 +185,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator
                 committableCollector.getEndOfInputCommittable();
         if (endOfInputCommittable != null) {
             do {
-                endOfInputCommittable.commit(false, committer);
+                endOfInputCommittable.commit(committer);
             } while (!committableCollector.isFinished());
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 484d7a712c3..e33594475e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -149,30 +149,23 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
         endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            notifyCheckpointComplete(EOI);
+            commitAndEmitCheckpoints();
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        if (endInput) {
-            // This is the final checkpoint, all committables should be 
committed
-            lastCompletedCheckpointId = Long.MAX_VALUE;
-        } else {
-            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        }
+        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
         commitAndEmitCheckpoints();
     }
 
     private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
+        long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
         do {
             for (CheckpointCommittableManager<CommT> manager :
-                    
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
-                // wait for all committables of the current manager before 
submission
-                boolean fullyReceived =
-                        !endInput && manager.getCheckpointId() == 
lastCompletedCheckpointId;
-                commitAndEmit(manager, fullyReceived);
+                    
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+                commitAndEmit(manager);
             }
             // !committableCollector.isFinished() indicates that we should 
retry
             // Retry should be done here if this is a final checkpoint 
(indicated by endInput)
@@ -185,10 +178,9 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
         }
     }
 
-    private void commitAndEmit(CommittableManager<CommT> committableManager, 
boolean fullyReceived)
+    private void commitAndEmit(CommittableManager<CommT> committableManager)
             throws IOException, InterruptedException {
-        Collection<CommittableWithLineage<CommT>> committed =
-                committableManager.commit(fullyReceived, committer);
+        Collection<CommittableWithLineage<CommT>> committed = 
committableManager.commit(committer);
         if (emitDownstream && !committed.isEmpty()) {
             output.collect(new 
StreamRecord<>(committableManager.getSummary()));
             for (CommittableWithLineage<CommT> committable : committed) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 09a33b72651..d98ec256e50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -120,10 +120,9 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     @Override
-    public Collection<CommittableWithLineage<CommT>> commit(
-            boolean fullyReceived, Committer<CommT> committer)
+    public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> 
committer)
             throws IOException, InterruptedException {
-        Collection<CommitRequestImpl<CommT>> requests = 
getPendingRequests(fullyReceived);
+        Collection<CommitRequestImpl<CommT>> requests = 
getPendingRequests(true);
         requests.forEach(CommitRequestImpl::setSelected);
         committer.commit(new ArrayList<>(requests));
         requests.forEach(CommitRequestImpl::setCommittedIfNoError);
@@ -132,9 +131,9 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
         return committed;
     }
 
-    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean 
fullyReceived) {
+    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean 
onlyIfFullyReceived) {
         return subtasksCommittableManagers.values().stream()
-                .filter(subtask -> !fullyReceived || subtask.hasReceivedAll())
+                .filter(subtask -> !onlyIfFullyReceived || 
subtask.hasReceivedAll())
                 .flatMap(SubtaskCommittableManager::getPendingRequests)
                 .collect(Collectors.toList());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
index 9b5aa9deccb..f6f176d6748 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java
@@ -37,16 +37,14 @@ public interface CommittableManager<CommT> {
     CommittableSummary<CommT> getSummary();
 
     /**
-     * Commits all due committables.
+     * Commits all due committables if all respective committables of the 
specific subtask and
+     * checkpoint have been received.
      *
-     * @param fullyReceived only commit committables if all committables of 
this checkpoint for a
-     *     subtask are received
      * @param committer used to commit to the external system
      * @return successfully committed committables with meta information
      * @throws IOException
      * @throws InterruptedException
      */
-    Collection<CommittableWithLineage<CommT>> commit(
-            boolean fullyReceived, Committer<CommT> committer)
+    Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> 
committer)
             throws IOException, InterruptedException;
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index 0ae88631a58..5bcebda497f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -75,17 +75,21 @@ class CheckpointCommittableManagerImplTest {
 
         final Committer<Integer> committer = new NoOpCommitter();
         // Only commit fully received committables
-        Collection<CommittableWithLineage<Integer>> commitRequests =
-                checkpointCommittables.commit(true, committer);
-        assertThat(commitRequests)
+        assertThat(checkpointCommittables.commit(committer))
                 .hasSize(1)
                 .satisfiesExactly(c -> 
assertThat(c.getCommittable()).isEqualTo(3));
 
+        // Even on retry
+        assertThat(checkpointCommittables.commit(committer)).isEmpty();
+
+        // Add missing committable
+        checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 
1L, 2));
         // Commit all committables
-        commitRequests = checkpointCommittables.commit(false, committer);
-        assertThat(commitRequests)
-                .hasSize(1)
-                .satisfiesExactly(c -> 
assertThat(c.getCommittable()).isEqualTo(4));
+        assertThat(checkpointCommittables.commit(committer))
+                .hasSize(2)
+                .satisfiesExactly(
+                        c -> assertThat(c.getCommittable()).isEqualTo(4),
+                        c -> assertThat(c.getCommittable()).isEqualTo(5));
     }
 
     @Test

Reply via email to