StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613005636



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask, 
OperatorEvent event) throws Exc
                         String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
             }
 
-            if (periodicTask != null) {
-                throw new Exception("periodic already running");
+            synchronized (recoveredTaskRunning) {
+                // signal the previous task that its recovered task is now 
running
+                final CompletableFuture<?> prevTaskFuture = 
recoveredTaskRunning.peekLast();
+                if (prevTaskFuture != null) {
+                    prevTaskFuture.complete(null);
+                }
+                // add a future for this task
+                recoveredTaskRunning.addLast(new CompletableFuture<>());
             }
-            periodicTask =
-                    executor.scheduleWithFixedDelay(this, delay, delay, 
TimeUnit.MILLISECONDS);
+
+            // first, we hand this over to the mailbox thread, so we preserve 
order on operations,
+            // even if the action is only to do a thread safe scheduling into 
the scheduledExecutor
+            runInMailbox(
+                    () -> {
+                        checkState(!workLoopRunning);
+                        checkState(subtaskGateway != null);
+
+                        workLoopRunning = true;
+                        scheduleSingleAction();
+                    });
         }
 
         @Override
         public void subtaskFailed(int subtask, @Nullable Throwable reason) {

Review comment:
       Could you clarify what you mean by "draw some unrecognized events to 
fail subtask"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to