StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r614165967



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask, 
OperatorEvent event) throws Exc
                         String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
             }
 
-            if (periodicTask != null) {
-                throw new Exception("periodic already running");
+            synchronized (recoveredTaskRunning) {
+                // signal the previous task that its recovered task is now 
running
+                final CompletableFuture<?> prevTaskFuture = 
recoveredTaskRunning.peekLast();
+                if (prevTaskFuture != null) {
+                    prevTaskFuture.complete(null);
+                }
+                // add a future for this task
+                recoveredTaskRunning.addLast(new CompletableFuture<>());
             }
-            periodicTask =
-                    executor.scheduleWithFixedDelay(this, delay, delay, 
TimeUnit.MILLISECONDS);
+
+            // first, we hand this over to the mailbox thread, so we preserve 
order on operations,
+            // even if the action is only to do a thread safe scheduling into 
the scheduledExecutor
+            runInMailbox(
+                    () -> {
+                        checkState(!workLoopRunning);
+                        checkState(subtaskGateway != null);
+
+                        workLoopRunning = true;
+                        scheduleSingleAction();
+                    });
         }
 
         @Override
         public void subtaskFailed(int subtask, @Nullable Throwable reason) {

Review comment:
       To my understanding, the problem is independent of that. It is caused by 
the fact that a new subtask is scheduled but the Context stays the same. Even 
if the Coordinator is recreated, as long as the Context is not re-created, the 
problem exists. And in some sense that is what is happening now: we re-create 
that part of the context that is responsible for event sending.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to