pnowojski commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r660541918



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -695,6 +697,27 @@ protected void afterInvoke() throws Exception {
         // close all operators in a chain effect way
         operatorChain.closeOperators(actionExecutor);
 
+        // If checkpoints are enabled, waits for all the records get processed 
by the downstream
+        // tasks. During this process, this task could coordinate with its 
downstream tasks to
+        // continue perform checkpoints.
+        if (configuration.isCheckpointingEnabled()) {
+            LOG.debug("Waiting for all the records processed by the downstream 
tasks.");
+            CompletableFuture<Void> combineFuture =
+                    FutureUtils.waitForAll(
+                            Arrays.stream(getEnvironment().getAllWriters())
+                                    .map(
+                                            FunctionUtils.uncheckedFunction(
+                                                    ResultPartitionWriter
+                                                            
::getAllRecordsProcessedFuture))
+                                    .collect(Collectors.toList()));
+
+            MailboxExecutor mailboxExecutor =
+                    
mailboxProcessor.getMailboxExecutor(TaskMailbox.MIN_PRIORITY);
+            while (!combineFuture.isDone()) {

Review comment:
       I've only very briefly took a look at this PR and this problem so far (I 
will take a one more look later), so take what I will write with a grain of 
salt :)
   
   I think this is a bit hacky. It seems to me that in a proper solution, this 
should/could be handled by the regular default action 
(`StreamTask#processInput`) and spinning the mailbox via `runMailboxLoop()`. I 
think very similar problem was solved by Anton when he split the 
`runMailboxLoop()` between `StreamTask#executeRestore` and  
`StreamTask#executeInvoke`. After the first one, mailbox is suspended and then 
restarted in the second one. It seems like you could have replaced this 
semi-busy loop with just another step (after `executeInvoke` mailbox should be 
suspended, and then restarted in the `afterInvoke()`)?
   
   For reference you might want to take a look at the 
`MailboxProcessor#suspend`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to