dawidwys commented on a change in pull request #14857:
URL: https://github.com/apache/flink/pull/14857#discussion_r665235501



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -406,6 +406,31 @@ public void initInputsCheckpoint(long id, 
CheckpointOptions checkpointOptions)
         }
     }
 
+    public void waitForPendingCheckpoints() throws Exception {
+        if (!enableCheckpointAfterTasksFinished) {
+            return;
+        }
+
+        List<AsyncCheckpointRunnable> asyncCheckpointRunnables;
+        synchronized (lock) {
+            asyncCheckpointRunnables = new ArrayList<>(checkpoints.values());
+        }
+
+        // Waits for each checkpoint independently.
+        asyncCheckpointRunnables.forEach(
+                ar -> {
+                    try {
+                        ar.getFinishedFuture().get();
+                    } catch (Exception e) {
+                        LOG.error(

Review comment:
       Does it make sense to log this exception? As far as I can tell we can 
end up here only with an `InterrputedException` on the `get`. Exceptions from 
the runnable are not forwarded to the `Future`, are they? We only ever complete 
the `Future` with `complete(null);`
   
   I'd suggest to do:
   
   ```
           final CompletableFuture<Void> combinedFuture;
           synchronized (lock) {
               combinedFuture =
                       FutureUtils.waitForAll(
                               checkpoints.values().stream()
                                       
.map(AsyncCheckpointRunnable::getFinishedFuture)
                                       .collect(Collectors.toList()));
           }
   
           combinedFuture.get();
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -110,8 +109,6 @@ public void run() {
         FileSystemSafetyNet.initializeSafetyNetForThread();
         try {
 
-            registerConsumer.accept(this);

Review comment:
       Makes total sense! I did not spend enough time analyzing it before.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1864,6 +1866,61 @@ static void processMailTillCheckpointSucceeds(
         testHarness.getTaskStateManager().getWaitForReportLatch().await();
     }
 
+    @Test
+    public void testWaitingForPendingCheckpointsOnFinished() throws Exception {

Review comment:
       Currently, the test passes both with and without waiting for the pending 
checkpoints.
   
   Moreover I think there is a lot of timing assumptions that need to be met 
for the test to verify the intended behaviour. I could not think of a good way 
to fix all of them (would be best to remove the remaining `Thread#sleep()`), 
but what do you think about such test:
   
   ```
       @Test
       public void testWaitingForPendingCheckpointsOnFinished() throws 
Exception {
           OneShotLatch asyncCheckpointExecuted = new OneShotLatch();
           OneShotLatch canCheckpointBeAcknowledged = new OneShotLatch();
           OneShotLatch invokeCompleted = new OneShotLatch();
           TestCheckpointResponder responder =
                   new TestCheckpointResponder() {
                       @Override
                       public void acknowledgeCheckpoint(
                               JobID jobID,
                               ExecutionAttemptID executionAttemptID,
                               long checkpointId,
                               CheckpointMetrics checkpointMetrics,
                               TaskStateSnapshot subtaskState) {
   
                           try {
                               asyncCheckpointExecuted.trigger();
                               canCheckpointBeAcknowledged.await();
                           } catch (Exception e) {
                               throw new RuntimeException(e);
                           }
                       }
                   };
   
           CompletableFuture<Void> taskClosed =
                   CompletableFuture.runAsync(
                           () -> {
                               try (StreamTaskMailboxTestHarness<String> 
harness =
                                       new 
StreamTaskMailboxTestHarnessBuilder<>(
                                                       OneInputStreamTask::new,
                                                       
BasicTypeInfo.STRING_TYPE_INFO)
                                               
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
                                               .modifyStreamConfig(
                                                       config -> 
config.setCheckpointingEnabled(true))
                                               
.setCheckpointResponder(responder)
                                               .setupOperatorChain(new 
EmptyOperator())
                                               .finishForSingletonOperatorChain(
                                                       
StringSerializer.INSTANCE)
                                               .build()) {
   
                                   harness.streamTask
                                           .getCheckpointCoordinator()
                                           
.setEnableCheckpointAfterTasksFinished(true);
   
                                   
harness.streamTask.triggerCheckpointOnBarrier(
                                           new CheckpointMetaData(1, 101),
                                           
CheckpointOptions.forCheckpointWithDefaultLocation(),
                                           new CheckpointMetricsBuilder()
                                                   
.setBytesProcessedDuringAlignment(0L)
                                                   
.setAlignmentDurationNanos(0L));
   
                                   harness.waitForTaskCompletion();
                                   invokeCompleted.trigger();
                                   harness.finishProcessing();
                               } catch (Exception e) {
                                   e.printStackTrace();
                               }
                           },
                           Executors.newSingleThreadExecutor());
   
           asyncCheckpointExecuted.await();
           invokeCompleted.await();
           Thread.sleep(500); // give some potential time for the task to 
finish before the
           // checkpoint is acknowledged
           assertFalse(taskClosed.isDone());
           canCheckpointBeAcknowledged.trigger();
           taskClosed.get();
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to