This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 9400f88 [FLINK-22345][coordination] Catch pre-mature state restore
for Operator Coordinators
9400f88 is described below
commit 9400f880699d38eb065917ba6e9f13ef5632654f
Author: Stephan Ewen <[email protected]>
AuthorDate: Thu Apr 22 02:06:20 2021 +0200
[FLINK-22345][coordination] Catch pre-mature state restore for Operator
Coordinators
---
.../coordination/OperatorCoordinatorHolder.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 8625fe2..10696b8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -390,20 +390,24 @@ public class OperatorCoordinatorHolder
// We need to do this synchronously here, otherwise we violate the
contract that
// 'subtaskFailed()' will never overtake 'subtaskReady()'.
- // An alternative, if we ever figure out that this cannot work
synchronously here,
- // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
- // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ // ---
+ // It is also possible that by the time this method here is called,
the task execution is in
+ // a no-longer running state. That happens when the scheduler deals
with overlapping global
+ // failures and the restore method is in fact not yet restoring to the
new execution
+ // attempts, but still targeting the previous execution attempts (and
is later subsumed
+ // by another restore to the new execution attempt). This is tricky
behavior that we need
+ // to work around. So if the task is no longer running, we don't call
the 'subtaskReady()'
+ // method.
FutureUtils.assertNoException(
sta.hasSwitchedToRunning()
.thenAccept(
(ignored) -> {
mainThreadExecutor.assertRunningInMainThread();
- // this is a guard in case someone
accidentally makes the
- // notification asynchronous
- assert sta.isStillRunning();
-
- notifySubtaskReady(subtask, gateway);
+ // see bigger comment above
+ if (sta.isStillRunning()) {
+ notifySubtaskReady(subtask, gateway);
+ }
}));
}