StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613086839
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask,
OperatorEvent event) throws Exc
String.format("Don't recognize event '%s' from task
%d.", event, subtask));
}
- if (periodicTask != null) {
- throw new Exception("periodic already running");
+ synchronized (recoveredTaskRunning) {
+ // signal the previous task that its recovered task is now
running
+ final CompletableFuture<?> prevTaskFuture =
recoveredTaskRunning.peekLast();
+ if (prevTaskFuture != null) {
+ prevTaskFuture.complete(null);
+ }
+ // add a future for this task
+ recoveredTaskRunning.addLast(new CompletableFuture<>());
}
- periodicTask =
- executor.scheduleWithFixedDelay(this, delay, delay,
TimeUnit.MILLISECONDS);
+
+ // first, we hand this over to the mailbox thread, so we preserve
order on operations,
+ // even if the action is only to do a thread safe scheduling into
the scheduledExecutor
+ runInMailbox(
+ () -> {
+ checkState(!workLoopRunning);
+ checkState(subtaskGateway != null);
+
+ workLoopRunning = true;
+ scheduleSingleAction();
+ });
}
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
Review comment:
I think that is a nice idea. Ideally we have both (globalFailover from
master, and task failure).
I expect this should just work, as it doesn't matter to the scheduler where
the failure comes from.
Let's apply that as a follow-up patch, enhance the test that we have both
types of failures.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]