This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 134bf64  [hotfix][coordination] Fix MiniCluster#closeAsync to 
correctly close all components and services
134bf64 is described below

commit 134bf642e5a82abb0a7f8856501550a60a520226
Author: tison <wander4...@gmail.com>
AuthorDate: Mon Sep 23 14:26:21 2019 +0800

    [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all 
components and services
    
    This closes #9743.
---
 .../org/apache/flink/runtime/minicluster/MiniCluster.java     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ddb89c..5839a97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,7 +97,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -430,15 +429,15 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                                componentsTerminationFuture,
                                                this::closeMetricSystem);
 
-                                       // shut down the RpcServices
-                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricSystemTerminationFuture
-                                               .thenCompose((Void ignored) -> 
terminateRpcServices());
+                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = FutureUtils.composeAfterwards(
+                                               metricSystemTerminationFuture,
+                                               this::terminateRpcServices);
 
                                        final CompletableFuture<Void> 
remainingServicesTerminationFuture = FutureUtils.runAfterwards(
                                                rpcServicesTerminationFuture,
                                                
this::terminateMiniClusterServices);
 
-                                       final CompletableFuture<Void> 
executorsTerminationFuture = FutureUtils.runAfterwards(
+                                       final CompletableFuture<Void> 
executorsTerminationFuture = FutureUtils.composeAfterwards(
                                                
remainingServicesTerminationFuture,
                                                () -> 
terminateExecutors(shutdownTimeoutMillis));
 
@@ -832,7 +831,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        }
 
        @Nonnull
-       private CompletionStage<Void> terminateRpcServices() {
+       private CompletableFuture<Void> terminateRpcServices() {
                synchronized (lock) {
                        final int numRpcServices = 1 + rpcServices.size();
 

Reply via email to