kezhuw commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613007347
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask,
OperatorEvent event) throws Exc
String.format("Don't recognize event '%s' from task
%d.", event, subtask));
}
- if (periodicTask != null) {
- throw new Exception("periodic already running");
+ synchronized (recoveredTaskRunning) {
+ // signal the previous task that its recovered task is now
running
+ final CompletableFuture<?> prevTaskFuture =
recoveredTaskRunning.peekLast();
+ if (prevTaskFuture != null) {
+ prevTaskFuture.complete(null);
+ }
+ // add a future for this task
+ recoveredTaskRunning.addLast(new CompletableFuture<>());
}
- periodicTask =
- executor.scheduleWithFixedDelay(this, delay, delay,
TimeUnit.MILLISECONDS);
+
+ // first, we hand this over to the mailbox thread, so we preserve
order on operations,
+ // even if the action is only to do a thread safe scheduling into
the scheduledExecutor
+ runInMailbox(
+ () -> {
+ checkState(!workLoopRunning);
+ checkState(subtaskGateway != null);
+
+ workLoopRunning = true;
+ scheduleSingleAction();
+ });
}
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
Review comment:
I means sending `UnrecognizedEvent` to operator to fail running task.
Currently, running tasks are not failed due to themselves but global failover.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]