This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9fd59b6dd62f72f575ad662eb41cb9ade1887f91 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Tue Nov 10 02:06:08 2020 +0800 [FLINK-20050][runtime/operator] Fix methods that are only visible for testing in RecreateOnResetOperatorCoordinator, so that they work with fully asynchronous thread model. --- .../operators/coordination/RecreateOnResetOperatorCoordinator.java | 6 ++++-- .../coordination/RecreateOnResetOperatorCoordinatorTest.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 8ee69ea..af19225 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -131,12 +131,14 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { // --------------------- @VisibleForTesting - public OperatorCoordinator getInternalCoordinator() { + public OperatorCoordinator getInternalCoordinator() throws Exception { + waitForAllAsyncCallsFinish(); return coordinator.internalCoordinator; } @VisibleForTesting - QuiesceableContext getQuiesceableContext() { + QuiesceableContext getQuiesceableContext() throws Exception { + waitForAllAsyncCallsFinish(); return coordinator.internalQuiesceableContext; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java index c62b73b..3813059 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java @@ -239,7 +239,7 @@ public class RecreateOnResetOperatorCoordinatorTest { return (RecreateOnResetOperatorCoordinator) provider.create(context); } - private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) { + private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) throws Exception { return (TestingOperatorCoordinator) coordinator.getInternalCoordinator(); }
