fapaul commented on code in PR #25547:
URL: https://github.com/apache/flink/pull/25547#discussion_r1818576013


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java:
##########
@@ -39,11 +39,21 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     private final long checkpointId;
     /** The number of committables coming from the given subtask in the 
particular checkpoint. */
     private final int numberOfCommittables;
+
+    @Deprecated
     /** The number of committables that have not been successfully committed. 
*/
     private final int numberOfPendingCommittables;
+
+    @Deprecated
     /** The number of committables that are not retried and have been failed. 
*/
     private final int numberOfFailedCommittables;

Review Comment:
   Why do we deprecate failed committables? I still see some value to provide 
the possibility to discard unrecoverable committables. The change also looks 
unrelated to the retry mechanism.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -164,41 +165,37 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
 
     private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
         long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
-        do {
-            for (CheckpointCommittableManager<CommT> manager :
-                    
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
-                commitAndEmit(manager);
-            }
-            // !committableCollector.isFinished() indicates that we should 
retry
-            // Retry should be done here if this is a final checkpoint 
(indicated by endInput)
-            // WARN: this is an endless retry, may make the job stuck while 
finishing
-        } while (!committableCollector.isFinished() && endInput);
-
-        if (!committableCollector.isFinished()) {
-            // if not endInput, we can schedule retrying later
-            retryWithDelay();
+        for (CheckpointCommittableManager<CommT> checkpointManager :
+                
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+            // ensure that all committables of the first checkpoint are fully 
committed before
+            // attempting the next committable
+            commitAndEmit(checkpointManager);
+            committableCollector.remove(checkpointManager);
         }
-        committableCollector.compact();
     }
 
     private void commitAndEmit(CheckpointCommittableManager<CommT> 
committableManager)
             throws IOException, InterruptedException {
-        Collection<CommittableWithLineage<CommT>> committed = 
committableManager.commit(committer);
-        if (emitDownstream && committableManager.isFinished()) {
-            int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
-            int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
-            output.collect(
-                    new 
StreamRecord<>(committableManager.getSummary(subtaskId, numberOfSubtasks)));
-            for (CommittableWithLineage<CommT> committable : committed) {
-                output.collect(new 
StreamRecord<>(committable.withSubtaskId(subtaskId)));
-            }
+        committableManager.commit(committer, MAX_RETRIES);
+        if (emitDownstream) {
+            emit(committableManager);
         }
     }
 
-    private void retryWithDelay() {
-        processingTimeService.registerTimer(
-                processingTimeService.getCurrentProcessingTime() + RETRY_DELAY,
-                ts -> commitAndEmitCheckpoints());
+    private void emit(CheckpointCommittableManager<CommT> committableManager) {
+        int subtaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+        int numberOfSubtasks = 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();

Review Comment:
   Nit: I know this wasn't introduced by this PR but why do we need to fetch 
the subtask id and number of tasks on every `emit` call?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java:
##########
@@ -95,6 +94,7 @@
 public class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
         implements OneInputStreamOperator<CommittableMessage<CommT>, Void> {
 
+    private static final int MAX_RETRIES = 10;

Review Comment:
   IMO we should make this a Flink config specific to the sink and allow users 
to opt-out with the failed committable mechanism. 
   
   Currently the number is duplicated in committer and global committer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to