This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 134bf64 [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services 134bf64 is described below commit 134bf642e5a82abb0a7f8856501550a60a520226 Author: tison <wander4...@gmail.com> AuthorDate: Mon Sep 23 14:26:21 2019 +0800 [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services This closes #9743. --- .../org/apache/flink/runtime/minicluster/MiniCluster.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 1ddb89c..5839a97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -97,7 +97,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -430,15 +429,15 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { componentsTerminationFuture, this::closeMetricSystem); - // shut down the RpcServices - final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture - .thenCompose((Void ignored) -> terminateRpcServices()); + final CompletableFuture<Void> rpcServicesTerminationFuture = FutureUtils.composeAfterwards( + metricSystemTerminationFuture, + this::terminateRpcServices); final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards( rpcServicesTerminationFuture, this::terminateMiniClusterServices); - final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.runAfterwards( + final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.composeAfterwards( remainingServicesTerminationFuture, () -> terminateExecutors(shutdownTimeoutMillis)); @@ -832,7 +831,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } @Nonnull - private CompletionStage<Void> terminateRpcServices() { + private CompletableFuture<Void> terminateRpcServices() { synchronized (lock) { final int numRpcServices = 1 + rpcServices.size();