This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 9400f88  [FLINK-22345][coordination] Catch pre-mature state restore 
for Operator Coordinators
9400f88 is described below

commit 9400f880699d38eb065917ba6e9f13ef5632654f
Author: Stephan Ewen <[email protected]>
AuthorDate: Thu Apr 22 02:06:20 2021 +0200

    [FLINK-22345][coordination] Catch pre-mature state restore for Operator 
Coordinators
---
 .../coordination/OperatorCoordinatorHolder.java      | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 8625fe2..10696b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -390,20 +390,24 @@ public class OperatorCoordinatorHolder
 
         // We need to do this synchronously here, otherwise we violate the 
contract that
         // 'subtaskFailed()' will never overtake 'subtaskReady()'.
-        // An alternative, if we ever figure out that this cannot work 
synchronously here,
-        // is that we re-enqueue all actions (like 'subtaskFailed()' and 
'subtaskRestored()')
-        // back into the main thread executor, rather than directly calling 
the OperatorCoordinator
+        // ---
+        // It is also possible that by the time this method here is called, 
the task execution is in
+        // a no-longer running state. That happens when the scheduler deals 
with overlapping global
+        // failures and the restore method is in fact not yet restoring to the 
new execution
+        // attempts, but still targeting the previous execution attempts (and 
is later subsumed
+        // by another restore to the new execution attempt). This is tricky 
behavior that we need
+        // to work around. So if the task is no longer running, we don't call 
the 'subtaskReady()'
+        // method.
         FutureUtils.assertNoException(
                 sta.hasSwitchedToRunning()
                         .thenAccept(
                                 (ignored) -> {
                                     
mainThreadExecutor.assertRunningInMainThread();
 
-                                    // this is a guard in case someone 
accidentally makes the
-                                    // notification asynchronous
-                                    assert sta.isStillRunning();
-
-                                    notifySubtaskReady(subtask, gateway);
+                                    // see bigger comment above
+                                    if (sta.isStillRunning()) {
+                                        notifySubtaskReady(subtask, gateway);
+                                    }
                                 }));
     }
 

Reply via email to