kezhuw commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613007347



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask, 
OperatorEvent event) throws Exc
                         String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
             }
 
-            if (periodicTask != null) {
-                throw new Exception("periodic already running");
+            synchronized (recoveredTaskRunning) {
+                // signal the previous task that its recovered task is now 
running
+                final CompletableFuture<?> prevTaskFuture = 
recoveredTaskRunning.peekLast();
+                if (prevTaskFuture != null) {
+                    prevTaskFuture.complete(null);
+                }
+                // add a future for this task
+                recoveredTaskRunning.addLast(new CompletableFuture<>());
             }
-            periodicTask =
-                    executor.scheduleWithFixedDelay(this, delay, delay, 
TimeUnit.MILLISECONDS);
+
+            // first, we hand this over to the mailbox thread, so we preserve 
order on operations,
+            // even if the action is only to do a thread safe scheduling into 
the scheduledExecutor
+            runInMailbox(
+                    () -> {
+                        checkState(!workLoopRunning);
+                        checkState(subtaskGateway != null);
+
+                        workLoopRunning = true;
+                        scheduleSingleAction();
+                    });
         }
 
         @Override
         public void subtaskFailed(int subtask, @Nullable Throwable reason) {

Review comment:
       I means sending `UnrecognizedEvent` to operator to fail running task. 
Currently, running tasks are not failed due to themselves but global failover.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to