This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb043471604ee9564876aebf85e415181f217a17
Author: Arvid Heise <[email protected]>
AuthorDate: Fri Oct 18 15:00:19 2024 +0200

    [FLINK-36455] Sinks retry synchronously
    
    Sinks so far retried asynchronously to increase commit throughput in case 
of temporary issues. However, the contract of notifyCheckpointCompleted states 
that checkpoints must be side-effect free meaning all transactions have to be 
committed on return of the PRC call.
    
    This commit retries a fixed number of times and then fails in 
notifyCheckpointCompleted.
    
    Note that sync retries significantly simplify the committable handling. 
This commit starts a few simplifications; the next commit clears up more.
    
    (cherry picked from commit bc0f241b86799a39d7ce08e5902e47c71bdaf68f)
---
 .../generated/common_miscellaneous_section.html    |  6 +++
 .../apache/flink/configuration/SinkOptions.java    | 42 +++++++++++++++
 .../connector/sink2/GlobalCommitterOperator.java   | 27 ++++------
 .../runtime/operators/sink/CommitterOperator.java  | 60 +++++++++++----------
 .../committables/CheckpointCommittableManager.java | 14 ++---
 .../CheckpointCommittableManagerImpl.java          | 62 +++++++++-------------
 .../sink/committables/CommittableCollector.java    |  9 ++--
 .../committables/SubtaskCommittableManager.java    |  9 +++-
 .../sink2/GlobalCommitterOperatorTest.java         |  1 +
 .../operators/sink/CommitterOperatorTestBase.java  | 35 ++++++++++--
 .../CheckpointCommittableManagerImplTest.java      | 18 +++----
 .../CommittableCollectorSerializerTest.java        | 10 ++--
 .../committables/CommittableCollectorTest.java     |  6 +--
 .../operators/sink/deprecated/TestSinkV2.java      |  6 +--
 14 files changed, 184 insertions(+), 121 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/common_miscellaneous_section.html 
b/docs/layouts/shortcodes/generated/common_miscellaneous_section.html
index 6ac31199dc7..26c6b3677df 100644
--- a/docs/layouts/shortcodes/generated/common_miscellaneous_section.html
+++ b/docs/layouts/shortcodes/generated/common_miscellaneous_section.html
@@ -26,5 +26,11 @@
             <td>String</td>
             <td>Directories for temporary files, separated by",", "|", or the 
system's java.io.File.pathSeparator.</td>
         </tr>
+        <tr>
+            <td><h5>sink.committer.retries</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The number of retries a Flink application attempts for 
committable operations (such as transactions) on retriable errors, as specified 
by the sink connector, before Flink fails and potentially restarts.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java
new file mode 100644
index 00000000000..2cf1431d939
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/SinkOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Configuration options for sinks. */
+@PublicEvolving
+public class SinkOptions {
+    /**
+     * The number of retries on a committable (e.g., transaction) before Flink 
application fails and
+     * potentially restarts.
+     */
+    @Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS)
+    public static final ConfigOption<Integer> COMMITTER_RETRIES =
+            key("sink.committer.retries")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The number of retries a Flink application 
attempts for committable operations (such as transactions) on retriable errors, 
as specified by the sink connector, before Flink fails and potentially 
restarts.");
+
+    private SinkOptions() {}
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
index b3195d286d6..9310a92504e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
@@ -49,7 +50,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -118,6 +118,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
     private long lastCompletedCheckpointId = -1;
     private SimpleVersionedSerializer<CommT> committableSerializer;
     private SinkCommitterMetricGroup metricGroup;
+    private int maxRetries;
 
     @Nullable private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
     @Nullable private SimpleVersionedSerializer<GlobalCommT> 
globalCommittableSerializer;
@@ -148,6 +149,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
             globalCommitter = gc.getGlobalCommitter();
             globalCommittableSerializer = gc.getGlobalCommittableSerializer();
         }
+        maxRetries = 
config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
     }
 
     @Override
@@ -215,23 +217,14 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
         }
 
         lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointIdOrEOI);
-        // this is true for the last commit and we need to make sure that all 
committables are
-        // indeed committed as this function will never be invoked again
-        boolean waitForAllCommitted =
-                lastCompletedCheckpointId == EOI
-                        && committableCollector
-                                .getEndOfInputCommittable()
-                                
.map(CheckpointCommittableManager::hasGloballyReceivedAll)
-                                .orElse(false);
-        do {
-            for (CheckpointCommittableManager<CommT> committable :
-                    
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
-                if (committable.hasGloballyReceivedAll()) {
-                    committable.commit(committer);
-                }
+        for (CheckpointCommittableManager<CommT> checkpointManager :
+                
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+            if (!checkpointManager.hasGloballyReceivedAll()) {
+                return;
             }
-            committableCollector.compact();
-        } while (waitForAllCommitted && !committableCollector.isFinished());
+            checkpointManager.commit(committer, maxRetries);
+            committableCollector.remove(checkpointManager);
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index d155f5dd509..10ae86cf10d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -22,12 +22,14 @@ import 
org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -65,7 +67,6 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
         implements OneInputStreamOperator<CommittableMessage<CommT>, 
CommittableMessage<CommT>>,
                 BoundedOneInput {
 
-    private static final long RETRY_DELAY = 1000;
     private final SimpleVersionedSerializer<CommT> committableSerializer;
     private final FunctionWithException<CommitterInitContext, 
Committer<CommT>, IOException>
             committerSupplier;
@@ -76,6 +77,7 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     private Committer<CommT> committer;
     private CommittableCollector<CommT> committableCollector;
     private long lastCompletedCheckpointId = -1;
+    private int maxRetries;
 
     private boolean endInput = false;
 
@@ -111,6 +113,7 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
         super.setup(containingTask, config, output);
         metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
         committableCollector = CommittableCollector.of(metricGroup);
+        maxRetries = 
config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
     }
 
     @Override
@@ -161,41 +164,42 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
         long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
-        do {
-            for (CheckpointCommittableManager<CommT> manager :
-                    
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
-                commitAndEmit(manager);
-            }
-            // !committableCollector.isFinished() indicates that we should 
retry
-            // Retry should be done here if this is a final checkpoint 
(indicated by endInput)
-            // WARN: this is an endless retry, may make the job stuck while 
finishing
-        } while (!committableCollector.isFinished() && endInput);
-
-        if (!committableCollector.isFinished()) {
-            // if not endInput, we can schedule retrying later
-            retryWithDelay();
+        for (CheckpointCommittableManager<CommT> checkpointManager :
+                
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+            // ensure that all committables of the first checkpoint are fully 
committed before
+            // attempting the next committable
+            commitAndEmit(checkpointManager);
+            committableCollector.remove(checkpointManager);
         }
-        committableCollector.compact();
     }
 
     private void commitAndEmit(CheckpointCommittableManager<CommT> 
committableManager)
             throws IOException, InterruptedException {
-        Collection<CommittableWithLineage<CommT>> committed = 
committableManager.commit(committer);
-        if (emitDownstream && committableManager.isFinished()) {
-            int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
-            int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
-            output.collect(
-                    new 
StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
-            for (CommittableWithLineage<CommT> committable : committed) {
-                output.collect(new 
StreamRecord<>(committable.withSubtaskId(subtaskId)));
-            }
+        committableManager.commit(committer, maxRetries);
+        if (emitDownstream) {
+            emit(committableManager);
         }
     }
 
-    private void retryWithDelay() {
-        processingTimeService.registerTimer(
-                processingTimeService.getCurrentProcessingTime() + RETRY_DELAY,
-                ts -> commitAndEmitCheckpoints());
+    private void emit(CheckpointCommittableManager<CommT> committableManager) {
+        int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+        int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+        long checkpointId = committableManager.getCheckpointId();
+        Collection<CommT> committables = 
committableManager.getSuccessfulCommittables();
+        output.collect(
+                new StreamRecord<>(
+                        new CommittableSummary<>(
+                                subtaskId,
+                                numberOfSubtasks,
+                                checkpointId,
+                                committables.size(),
+                                0,
+                                0)));
+        for (CommT committable : committables) {
+            output.collect(
+                    new StreamRecord<>(
+                            new CommittableWithLineage<>(committable, 
checkpointId, subtaskId)));
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
index ada7f8640f0..4e34fbbe698 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java
@@ -20,8 +20,6 @@ package 
org.apache.flink.streaming.runtime.operators.sink.committables;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -50,12 +48,6 @@ public interface CheckpointCommittableManager<CommT> {
     /** Returns the number of upstream subtasks belonging to the checkpoint. */
     int getNumberOfSubtasks();
 
-    /**
-     * Returns a summary of the current commit progress for the emitting 
subtask identified by the
-     * parameters.
-     */
-    CommittableSummary<CommT> getSummary(int emittingSubtaskId, int 
emittingNumberOfSubtasks);
-
     boolean isFinished();
 
     /**
@@ -69,8 +61,10 @@ public interface CheckpointCommittableManager<CommT> {
      * checkpoint have been received.
      *
      * @param committer used to commit to the external system
-     * @return successfully committed committables with meta information
+     * @param maxRetries
      */
-    Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> 
committer)
+    void commit(Committer<CommT> committer, int maxRetries)
             throws IOException, InterruptedException;
+
+    Collection<CommT> getSuccessfulCommittables();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index bb6cceead47..00c3b7f65d4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -126,24 +126,6 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
         return checkNotNull(committables, "Unknown subtask for %s", subtaskId);
     }
 
-    @Override
-    public CommittableSummary<CommT> getSummary(
-            int emittingSubtaskId, int emittingNumberOfSubtasks) {
-        return new CommittableSummary<>(
-                emittingSubtaskId,
-                emittingNumberOfSubtasks,
-                checkpointId,
-                subtasksCommittableManagers.values().stream()
-                        
.mapToInt(SubtaskCommittableManager::getNumCommittables)
-                        .sum(),
-                subtasksCommittableManagers.values().stream()
-                        .mapToInt(SubtaskCommittableManager::getNumPending)
-                        .sum(),
-                subtasksCommittableManagers.values().stream()
-                        .mapToInt(SubtaskCommittableManager::getNumFailed)
-                        .sum());
-    }
-
     @Override
     public boolean isFinished() {
         return subtasksCommittableManagers.values().stream()
@@ -158,20 +140,34 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     @Override
-    public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> 
committer)
+    public void commit(Committer<CommT> committer, int maxRetries)
             throws IOException, InterruptedException {
-        Collection<CommitRequestImpl<CommT>> requests = 
getPendingRequests(true);
-        requests.forEach(CommitRequestImpl::setSelected);
-        committer.commit(new ArrayList<>(requests));
-        requests.forEach(CommitRequestImpl::setCommittedIfNoError);
-        Collection<CommittableWithLineage<CommT>> committed = drainFinished();
-        metricGroup.setCurrentPendingCommittablesGauge(() -> 
getPendingRequests(false).size());
-        return committed;
+        Collection<CommitRequestImpl<CommT>> requests = getPendingRequests();
+        for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; 
retry++) {
+            requests.forEach(CommitRequestImpl::setSelected);
+            committer.commit(new ArrayList<>(requests));
+            requests.forEach(CommitRequestImpl::setCommittedIfNoError);
+            requests = requests.stream().filter(r -> 
!r.isFinished()).collect(Collectors.toList());
+            metricGroup.setCurrentPendingCommittablesGauge(requests::size);
+        }
+        if (!requests.isEmpty()) {
+            throw new IOException(
+                    String.format(
+                            "Failed to commit %s committables after %s 
retries: %s",
+                            requests.size(), maxRetries, requests));
+        }
     }
 
-    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean 
assertFull) {
+    @Override
+    public Collection<CommT> getSuccessfulCommittables() {
         return subtasksCommittableManagers.values().stream()
-                .peek(subtask -> assertReceivedAll(assertFull, subtask))
+                .flatMap(SubtaskCommittableManager::getSuccessfulCommittables)
+                .collect(Collectors.toList());
+    }
+
+    Collection<CommitRequestImpl<CommT>> getPendingRequests() {
+        return subtasksCommittableManagers.values().stream()
+                .peek(this::assertReceivedAll)
                 .flatMap(SubtaskCommittableManager::getPendingRequests)
                 .collect(Collectors.toList());
     }
@@ -192,20 +188,14 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
      * <p>This assertion will fail in case of bugs in the writer or in the 
pre-commit topology if
      * present.
      */
-    private void assertReceivedAll(boolean assertFull, 
SubtaskCommittableManager<CommT> subtask) {
+    private void assertReceivedAll(SubtaskCommittableManager<CommT> subtask) {
         Preconditions.checkArgument(
-                !assertFull || subtask.hasReceivedAll(),
+                subtask.hasReceivedAll(),
                 "Trying to commit incomplete batch of committables subtask=%s, 
manager=%s",
                 subtask.getSubtaskId(),
                 this);
     }
 
-    Collection<CommittableWithLineage<CommT>> drainFinished() {
-        return subtasksCommittableManagers.values().stream()
-                .flatMap(subtask -> subtask.drainCommitted().stream())
-                .collect(Collectors.toList());
-    }
-
     CheckpointCommittableManagerImpl<CommT> 
merge(CheckpointCommittableManagerImpl<CommT> other) {
         checkArgument(other.checkpointId == checkpointId);
         for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry 
:
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 37a7b72f7b3..098de7f186e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -132,7 +133,7 @@ public class CommittableCollector<CommT> {
      */
     public Collection<? extends CheckpointCommittableManager<CommT>> 
getCheckpointCommittablesUpTo(
             long checkpointId) {
-        return checkpointCommittables.headMap(checkpointId, true).values();
+        return new ArrayList<>(checkpointCommittables.headMap(checkpointId, 
true).values());
     }
 
     /**
@@ -208,9 +209,9 @@ public class CommittableCollector<CommT> {
         return checkNotNull(committables, "Unknown checkpoint for %s", 
committable);
     }
 
-    /** Removes all metadata about checkpoints of which all committables are 
fully committed. */
-    public void compact() {
-        
checkpointCommittables.values().removeIf(CheckpointCommittableManagerImpl::isFinished);
+    /** Removes the manager for a specific checkpoint and all it's metadata. */
+    public void remove(CheckpointCommittableManager<CommT> manager) {
+        checkpointCommittables.remove(manager.getCheckpointId());
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index 185d6e0fe78..3128a2d083c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -35,6 +35,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState.COMMITTED;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -138,6 +139,12 @@ class SubtaskCommittableManager<CommT> {
         return requests.stream().filter(c -> !c.isFinished());
     }
 
+    Stream<CommT> getSuccessfulCommittables() {
+        return getRequests().stream()
+                .filter(c -> c.getState() == COMMITTED)
+                .map(CommitRequestImpl::getCommittable);
+    }
+
     /**
      * Iterates through all currently registered {@link #requests} and returns 
all {@link
      * CommittableWithLineage} that could be successfully committed.
@@ -184,7 +191,7 @@ class SubtaskCommittableManager<CommT> {
         return checkpointId;
     }
 
-    Deque<CommitRequestImpl<CommT>> getRequests() {
+    Collection<CommitRequestImpl<CommT>> getRequests() {
         return requests;
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
index 865a24175ae..641a651e2e4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
@@ -63,6 +63,7 @@ class GlobalCommitterOperatorTest {
             if (commitOnInput) {
                 assertThat(committer.committed).containsExactly(1, 2);
             } else {
+                // 3PC behavior
                 assertThat(committer.committed).isEmpty();
                 testHarness.notifyOfCompletedCheckpoint(cid + 1);
                 assertThat(committer.committed).containsExactly(1, 2);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index e63ec66c41b..756ea0c8022 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -26,6 +27,7 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.assertj.core.api.ListAssert;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -201,10 +203,6 @@ abstract class CommitterOperatorTestBase {
         testHarness.processElement(new StreamRecord<>(second));
 
         final OperatorSubtaskState snapshot = 
testHarness.snapshot(checkpointId, 2L);
-
-        // Trigger first checkpoint but committer needs retry
-        testHarness.notifyOfCompletedCheckpoint(0);
-
         assertThat(testHarness.getOutput()).isEmpty();
         testHarness.close();
 
@@ -244,6 +242,35 @@ abstract class CommitterOperatorTestBase {
         restored.close();
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1})
+    void testNumberOfRetries(int numRetries) throws Exception {
+        try (OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        createTestHarness(
+                                sinkWithPostCommitWithRetry().sink, false, 
true, 1, 1, 0)) {
+            testHarness
+                    .getStreamConfig()
+                    .getConfiguration()
+                    .set(SinkOptions.COMMITTER_RETRIES, numRetries);
+            testHarness.open();
+
+            long ckdId = 1L;
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableSummary<>(0, 1, ckdId, 
1, 0, 0)));
+            testHarness.processElement(
+                    new StreamRecord<>(new CommittableWithLineage<>("1", 
ckdId, 0)));
+            AbstractThrowableAssert<?, ? extends Throwable> throwableAssert =
+                    assertThatCode(() -> 
testHarness.notifyOfCompletedCheckpoint(ckdId));
+            if (numRetries == 0) {
+                throwableAssert.hasMessageContaining("Failed to commit 1 
committables");
+            } else {
+                throwableAssert.doesNotThrowAnyException();
+            }
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index fcc2f559043..56804ce9f2b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -41,6 +40,7 @@ class CheckpointCommittableManagerImplTest {
 
     private static final SinkCommitterMetricGroup METRIC_GROUP =
             MetricsGroupTestUtils.mockCommitterMetricGroup();
+    private static final int MAX_RETRIES = 1;
 
     @Test
     void testAddSummary() {
@@ -74,19 +74,20 @@ class CheckpointCommittableManagerImplTest {
 
         final Committer<Integer> committer = new NoOpCommitter();
         // Only commit fully received committables
-        assertThatCode(() -> checkpointCommittables.commit(committer))
+        assertThatCode(() -> checkpointCommittables.commit(committer, 
MAX_RETRIES))
                 .hasMessageContaining("Trying to commit incomplete batch of 
committables");
 
         // Even on retry
-        assertThatCode(() -> checkpointCommittables.commit(committer))
+        assertThatCode(() -> checkpointCommittables.commit(committer, 
MAX_RETRIES))
                 .hasMessageContaining("Trying to commit incomplete batch of 
committables");
 
         // Add missing committable
         checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 
1L, 2));
         // Commit all committables
-        assertThat(checkpointCommittables.commit(committer))
+        assertThatCode(() -> checkpointCommittables.commit(committer, 
MAX_RETRIES))
+                .doesNotThrowAnyException();
+        assertThat(checkpointCommittables.getSuccessfulCommittables())
                 .hasSize(3)
-                .extracting(CommittableWithLineage::getCommittable)
                 .containsExactlyInAnyOrder(3, 4, 5);
     }
 
@@ -118,10 +119,9 @@ class CheckpointCommittableManagerImplTest {
         CheckpointCommittableManagerImpl<Integer> copy = original.copy();
 
         assertThat(copy.getCheckpointId()).isEqualTo(checkpointId);
-        SinkV2Assertions.assertThat(copy.getSummary(subtaskId, 
numberOfSubtasks))
-                .hasNumberOfSubtasks(numberOfSubtasks)
-                .hasSubtaskId(subtaskId)
-                .hasCheckpointId(checkpointId);
+        assertThat(copy)
+                .returns(numberOfSubtasks, 
CheckpointCommittableManagerImpl::getNumberOfSubtasks)
+                .returns(checkpointId, 
CheckpointCommittableManagerImpl::getCheckpointId);
     }
 
     private static class NoOpCommitter implements Committer<Integer> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
index 0b4bddd36a4..c90be1cd03c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.assertj.core.api.InstanceOfAssertFactories;
 import org.junit.jupiter.api.Test;
@@ -205,11 +204,10 @@ class CommittableCollectorSerializerTest {
                                     
checkpointCommittableManager.getSubtaskCommittableManager(
                                             subtaskId);
 
-                            SinkV2Assertions.assertThat(
-                                            
checkpointCommittableManager.getSummary(
-                                                    subtaskId, 
numberOfSubtasks))
-                                    .hasSubtaskId(subtaskId)
-                                    .hasNumberOfSubtasks(numberOfSubtasks);
+                            assertThat(checkpointCommittableManager)
+                                    .returns(
+                                            numberOfSubtasks,
+                                            
CheckpointCommittableManagerImpl::getNumberOfSubtasks);
 
                             assertPendingRequests(
                                     subtaskCommittableManager, 
expectedPendingRequestCount);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index b8a58aa6c65..6e55adcc0c5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.streaming.runtime.operators.sink.committables;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
 
@@ -59,7 +58,8 @@ class CommittableCollectorTest {
         Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable =
                 committableCollector.getEndOfInputCommittable();
         assertThat(endOfInputCommittable).isPresent();
-        SinkV2Assertions.assertThat(endOfInputCommittable.get().getSummary(1, 
1))
-                .hasCheckpointId(EOI);
+        assertThat(endOfInputCommittable)
+                .get()
+                .returns(EOI, CheckpointCommittableManager::getCheckpointId);
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
index e240de21066..dfaaf17d884 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
@@ -406,17 +406,17 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     /** A {@link Committer} that always re-commits the committables data it 
received. */
     public static class RetryOnceCommitter extends DefaultCommitter {
 
-        private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
+        private final Set<String> seen = new LinkedHashSet<>();
 
         @Override
         public void commit(Collection<CommitRequest<String>> committables) {
             committables.forEach(
                     c -> {
-                        if (seen.remove(c)) {
+                        if (seen.remove(c.getCommittable())) {
                             checkNotNull(committedData);
                             committedData.add(c);
                         } else {
-                            seen.add(c);
+                            seen.add(c.getCommittable());
                             c.retryLater();
                         }
                     });


Reply via email to