rkhachatryan commented on a change in pull request #15550:
URL: https://github.com/apache/flink/pull/15550#discussion_r610676693
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -932,9 +932,9 @@ public void heartbeatFromResourceManager(ResourceID
resourceID) {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp,
checkpointOptions);
-
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return task.triggerCheckpointBarrier(
+ checkpointId, checkpointTimestamp,
checkpointOptions)
+ .thenApply(result -> Acknowledge.get());
Review comment:
I think this future should be completed upon trigger succeeds, i.e.
immediately after `task.triggerCheckpointBarrier` returns. From
`TaskExecutorGateway`:
```
* @return Future acknowledge if the checkpoint has been successfully
triggered
```
With the current change, it will be completed upon sync phase completion.
Given the above and that the result is still ignored in
`RpcTaskManagerGateway`, I'm a bit skeptical of returning the future through
the call chain.
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1277,7 +1277,7 @@ public void requestPartitionProducerState(
* @param checkpointTimestamp The timestamp associated with the checkpoint.
* @param checkpointOptions Options for performing this checkpoint.
*/
- public void triggerCheckpointBarrier(
+ public CompletableFuture<Boolean> triggerCheckpointBarrier(
Review comment:
nit: if we do need to return the result, can we return a `Future` (not a
`CompletableFuture`), which can't be completed outside accidentially?
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
##########
@@ -384,24 +380,28 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
/** A {@link StreamTask} that simply waits to be terminated normally. */
public static class NoOpBlockingStreamTask extends NoOpStreamTask {
- private final transient OneShotLatch finishLatch;
+ private transient MailboxDefaultAction.Suspension suspension;
public NoOpBlockingStreamTask(final Environment environment) throws
Exception {
super(environment);
- this.finishLatch = new OneShotLatch();
}
@Override
protected void processInput(MailboxDefaultAction.Controller
controller) throws Exception {
invokeLatch.countDown();
- finishLatch.await();
- controller.allActionsCompleted();
+ if (suspension == null) {
+ suspension = controller.suspendDefaultAction();
+ } else {
+ controller.allActionsCompleted();
+ }
Review comment:
This change removes blocking from `NoOpBlockingStreamTask`, right?
So it's not clear to me what the tests are verifying now.
Probably, the they were written before the mailbox was introduced when
different threads were calling `processInput` and `finishTask`. And it tested
that they can't block each other.
Now they **can**, so the test seems invalid to me.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]