rkhachatryan commented on a change in pull request #15550:
URL: https://github.com/apache/flink/pull/15550#discussion_r610676693



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -932,9 +932,9 @@ public void heartbeatFromResourceManager(ResourceID 
resourceID) {
         final Task task = taskSlotTable.getTask(executionAttemptID);
 
         if (task != null) {
-            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, 
checkpointOptions);
-
-            return CompletableFuture.completedFuture(Acknowledge.get());
+            return task.triggerCheckpointBarrier(
+                            checkpointId, checkpointTimestamp, 
checkpointOptions)
+                    .thenApply(result -> Acknowledge.get());

Review comment:
       I think this future should be completed upon trigger succeeds, i.e. 
immediately after `task.triggerCheckpointBarrier` returns. From 
`TaskExecutorGateway`:
   ```
        * @return Future acknowledge if the checkpoint has been successfully 
triggered
   ```
   
   With the current change, it will be completed upon sync phase completion.
   
   Given the above and that the result is still ignored in 
`RpcTaskManagerGateway`, I'm a bit skeptical of returning the future through 
the call chain.
   
   WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1277,7 +1277,7 @@ public void requestPartitionProducerState(
      * @param checkpointTimestamp The timestamp associated with the checkpoint.
      * @param checkpointOptions Options for performing this checkpoint.
      */
-    public void triggerCheckpointBarrier(
+    public CompletableFuture<Boolean> triggerCheckpointBarrier(

Review comment:
       nit: if we do need to return the result, can we return a `Future` (not a 
`CompletableFuture`), which can't be completed outside accidentially?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
##########
@@ -384,24 +380,28 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
     /** A {@link StreamTask} that simply waits to be terminated normally. */
     public static class NoOpBlockingStreamTask extends NoOpStreamTask {
 
-        private final transient OneShotLatch finishLatch;
+        private transient MailboxDefaultAction.Suspension suspension;
 
         public NoOpBlockingStreamTask(final Environment environment) throws 
Exception {
             super(environment);
-            this.finishLatch = new OneShotLatch();
         }
 
         @Override
         protected void processInput(MailboxDefaultAction.Controller 
controller) throws Exception {
             invokeLatch.countDown();
-            finishLatch.await();
-            controller.allActionsCompleted();
+            if (suspension == null) {
+                suspension = controller.suspendDefaultAction();
+            } else {
+                controller.allActionsCompleted();
+            }

Review comment:
       This change removes blocking from `NoOpBlockingStreamTask`, right?
   So it's not clear to me what the tests are verifying now.
   
   Probably, the they were written before the mailbox was introduced when 
different threads were calling `processInput` and `finishTask`. And it tested 
that they can't block each other.
   Now they **can**, so the test seems invalid to me.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to