This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1d32f1b539cd326ae9d616fb5a8ba57dca860bfd Author: Arvid Heise <[email protected]> AuthorDate: Wed Sep 11 10:22:29 2024 +0200 [FLINK-25920] Only process complete batches of committables The committer is supposed to commit all committables at once for a given subtask (so that it can potentially optimize committables on the fly). With UCs, we could potentially see notifyCheckpointCompleted before receiving all committables. The CommittableSummary was built and is used to detect that. So far, we enforced completeness only for the most current committables belonging the respective checkpoint being completed. However, we should also enforce it to all subsumed committables. In fact, we probably implicitly do it but we have the extra code path which allows subsumed committables to be incomplete. This commit simplifies the code a bit by always enforcing completeness. --- .../connector/sink2/GlobalCommitterOperator.java | 5 ++--- .../runtime/operators/sink/CommitterOperator.java | 22 +++++++--------------- .../CheckpointCommittableManagerImpl.java | 9 ++++----- .../sink/committables/CommittableManager.java | 8 +++----- .../CheckpointCommittableManagerImplTest.java | 18 +++++++++++------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java index 5776ae43b00..86697081436 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java @@ -175,8 +175,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator sinkV1State = globalCommitter.commit(sinkV1State); } for (CheckpointCommittableManager<CommT> committable : getCommittables(checkpointId)) { - boolean fullyReceived = committable.getCheckpointId() == lastCompletedCheckpointId; - committable.commit(fullyReceived, committer); + committable.commit(committer); } } @@ -186,7 +185,7 @@ class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator committableCollector.getEndOfInputCommittable(); if (endOfInputCommittable != null) { do { - endOfInputCommittable.commit(false, committer); + endOfInputCommittable.commit(committer); } while (!committableCollector.isFinished()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 484d7a712c3..e33594475e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -149,30 +149,23 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - notifyCheckpointComplete(EOI); + commitAndEmitCheckpoints(); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - if (endInput) { - // This is the final checkpoint, all committables should be committed - lastCompletedCheckpointId = Long.MAX_VALUE; - } else { - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - } + lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); commitAndEmitCheckpoints(); } private void commitAndEmitCheckpoints() throws IOException, InterruptedException { + long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; do { for (CheckpointCommittableManager<CommT> manager : - committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) { - // wait for all committables of the current manager before submission - boolean fullyReceived = - !endInput && manager.getCheckpointId() == lastCompletedCheckpointId; - commitAndEmit(manager, fullyReceived); + committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + commitAndEmit(manager); } // !committableCollector.isFinished() indicates that we should retry // Retry should be done here if this is a final checkpoint (indicated by endInput) @@ -185,10 +178,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage } } - private void commitAndEmit(CommittableManager<CommT> committableManager, boolean fullyReceived) + private void commitAndEmit(CommittableManager<CommT> committableManager) throws IOException, InterruptedException { - Collection<CommittableWithLineage<CommT>> committed = - committableManager.commit(fullyReceived, committer); + Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer); if (emitDownstream && !committed.isEmpty()) { output.collect(new StreamRecord<>(committableManager.getSummary())); for (CommittableWithLineage<CommT> committable : committed) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 09a33b72651..d98ec256e50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -120,10 +120,9 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa } @Override - public Collection<CommittableWithLineage<CommT>> commit( - boolean fullyReceived, Committer<CommT> committer) + public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) throws IOException, InterruptedException { - Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(fullyReceived); + Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(true); requests.forEach(CommitRequestImpl::setSelected); committer.commit(new ArrayList<>(requests)); requests.forEach(CommitRequestImpl::setCommittedIfNoError); @@ -132,9 +131,9 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa return committed; } - Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean fullyReceived) { + Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) { return subtasksCommittableManagers.values().stream() - .filter(subtask -> !fullyReceived || subtask.hasReceivedAll()) + .filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll()) .flatMap(SubtaskCommittableManager::getPendingRequests) .collect(Collectors.toList()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java index 9b5aa9deccb..f6f176d6748 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableManager.java @@ -37,16 +37,14 @@ public interface CommittableManager<CommT> { CommittableSummary<CommT> getSummary(); /** - * Commits all due committables. + * Commits all due committables if all respective committables of the specific subtask and + * checkpoint have been received. * - * @param fullyReceived only commit committables if all committables of this checkpoint for a - * subtask are received * @param committer used to commit to the external system * @return successfully committed committables with meta information * @throws IOException * @throws InterruptedException */ - Collection<CommittableWithLineage<CommT>> commit( - boolean fullyReceived, Committer<CommT> committer) + Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer) throws IOException, InterruptedException; } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java index 0ae88631a58..5bcebda497f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java @@ -75,17 +75,21 @@ class CheckpointCommittableManagerImplTest { final Committer<Integer> committer = new NoOpCommitter(); // Only commit fully received committables - Collection<CommittableWithLineage<Integer>> commitRequests = - checkpointCommittables.commit(true, committer); - assertThat(commitRequests) + assertThat(checkpointCommittables.commit(committer)) .hasSize(1) .satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(3)); + // Even on retry + assertThat(checkpointCommittables.commit(committer)).isEmpty(); + + // Add missing committable + checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2)); // Commit all committables - commitRequests = checkpointCommittables.commit(false, committer); - assertThat(commitRequests) - .hasSize(1) - .satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(4)); + assertThat(checkpointCommittables.commit(committer)) + .hasSize(2) + .satisfiesExactly( + c -> assertThat(c.getCommittable()).isEqualTo(4), + c -> assertThat(c.getCommittable()).isEqualTo(5)); } @Test
