This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fd59b6dd62f72f575ad662eb41cb9ade1887f91
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Nov 10 02:06:08 2020 +0800

    [FLINK-20050][runtime/operator] Fix methods that are only visible for 
testing in RecreateOnResetOperatorCoordinator, so that they work with fully 
asynchronous thread model.
---
 .../operators/coordination/RecreateOnResetOperatorCoordinator.java  | 6 ++++--
 .../coordination/RecreateOnResetOperatorCoordinatorTest.java        | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 8ee69ea..af19225 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -131,12 +131,14 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
        // ---------------------
 
        @VisibleForTesting
-       public OperatorCoordinator getInternalCoordinator() {
+       public OperatorCoordinator getInternalCoordinator() throws Exception {
+               waitForAllAsyncCallsFinish();
                return coordinator.internalCoordinator;
        }
 
        @VisibleForTesting
-       QuiesceableContext getQuiesceableContext() {
+       QuiesceableContext getQuiesceableContext() throws Exception {
+               waitForAllAsyncCallsFinish();
                return coordinator.internalQuiesceableContext;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index c62b73b..3813059 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -239,7 +239,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
                return (RecreateOnResetOperatorCoordinator) 
provider.create(context);
        }
 
-       private TestingOperatorCoordinator 
getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) {
+       private TestingOperatorCoordinator 
getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) throws 
Exception {
                return (TestingOperatorCoordinator) 
coordinator.getInternalCoordinator();
        }
 

Reply via email to