This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c927e176f35b9db0f9daec7b67ccdb41d8e7d9f5
Author: tison <[email protected]>
AuthorDate: Fri Nov 29 09:37:47 2019 +0800

    [FLINK-14762][client] Handle clients close gracefully
---
 .../deployment/AbstractJobClusterExecutor.java     | 22 +++++++++++++-
 .../deployment/AbstractSessionClusterExecutor.java |  7 ++++-
 .../deployment/ClusterClientJobClientAdapter.java  | 35 ++++++++++------------
 .../apache/flink/client/program/ClusterClient.java |  2 +-
 .../client/program/rest/RestClusterClient.java     | 29 ++++++++++--------
 .../apache/flink/client/program/ClientTest.java    |  2 +-
 .../org/apache/flink/core/execution/JobClient.java |  5 ++++
 .../java/ExecutorDiscoveryAndJobClientTest.java    |  5 ----
 .../ExecutorDiscoveryAndJobClientTest.java         |  5 ----
 9 files changed, 67 insertions(+), 45 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 0fd03e6..6a18c2f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,26 @@ public class AbstractJobClusterExecutor<ClusterID, 
ClientFactory extends Cluster
                        LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
 
                        final boolean withShutdownHook = 
!configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit();
-                       return CompletableFuture.completedFuture(new 
ClusterClientJobClientAdapter<>(clusterClient, jobGraph.getJobID(), 
withShutdownHook));
+
+                       if (withShutdownHook) {
+                               Thread shutdownHook = 
ShutdownHookUtil.addShutdownHook(
+                                       clusterClient::shutDownCluster, 
clusterClient.getClass().getSimpleName(), LOG);
+
+                               return CompletableFuture.completedFuture(new 
ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) {
+                                       @Override
+                                       protected void doClose() {
+                                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
clusterClient.getClass().getSimpleName(), LOG);
+                                               clusterClient.close();
+                                       }
+                               });
+                       } else {
+                               return CompletableFuture.completedFuture(new 
ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) {
+                                       @Override
+                                       protected void doClose() {
+                                               clusterClient.close();
+                                       }
+                               });
+                       }
                }
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index 65382ba..a1a1cb2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -61,7 +61,12 @@ public class AbstractSessionClusterExecutor<ClusterID, 
ClientFactory extends Clu
                        return clusterClient
                                        .submitJob(jobGraph)
                                        
.thenApply(JobSubmissionResult::getJobID)
-                                       .thenApply(jobID -> new 
ClusterClientJobClientAdapter<>(clusterClient, jobID, false));
+                                       .thenApply(jobID -> new 
ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) {
+                                               @Override
+                                               protected void doClose() {
+                                                       clusterClient.close();
+                                               }
+                                       });
                }
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index f4a7376..6ba558d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -26,14 +26,11 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.ShutdownHookUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -42,24 +39,15 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterClientJobClientAdapter.class);
-
        private final ClusterClient<ClusterID> clusterClient;
 
        private final JobID jobID;
 
-       private final Thread shutdownHook;
+       private final AtomicBoolean running = new AtomicBoolean(true);
 
-       public ClusterClientJobClientAdapter(final ClusterClient<ClusterID> 
clusterClient, final JobID jobID, final boolean withShutdownHook) {
+       public ClusterClientJobClientAdapter(final ClusterClient<ClusterID> 
clusterClient, final JobID jobID) {
                this.jobID = checkNotNull(jobID);
                this.clusterClient = checkNotNull(clusterClient);
-
-               if (withShutdownHook) {
-                       shutdownHook = ShutdownHookUtil.addShutdownHook(
-                                       clusterClient::shutDownCluster, 
clusterClient.getClass().getSimpleName(), LOG);
-               } else {
-                       shutdownHook = null;
-               }
        }
 
        @Override
@@ -87,10 +75,19 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
        }
 
        @Override
-       public void close() throws Exception {
-               if (shutdownHook != null) {
-                       ShutdownHookUtil.removeShutdownHook(shutdownHook, 
clusterClient.getClass().getSimpleName(), LOG);
+       public final void close() {
+               if (running.compareAndSet(true, false)) {
+                       doClose();
                }
-               this.clusterClient.close();
+       }
+
+       /**
+        * Method to be overridden by subclass which contains actual close 
actions.
+        *
+        * <p>We do close in this way to ensure multiple calls to {@link 
#close()}
+        * are executed at most once guarded by {@link #running} flag.
+        */
+       protected void doClose() {
+
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index ad14072..eadcbe7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
 public interface ClusterClient<T> extends AutoCloseable {
 
        @Override
-       default void close() throws Exception {
+       default void close() {
 
        }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index c6e1954..40c2b64 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -119,6 +119,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -150,6 +151,8 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
 
        private final LeaderRetriever webMonitorLeaderRetriever = new 
LeaderRetriever();
 
+       private final AtomicBoolean running = new AtomicBoolean(true);
+
        /** ExecutorService to run operations that can be retried on 
exceptions. */
        private ScheduledExecutorService retryExecutorService;
 
@@ -199,21 +202,23 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
 
        @Override
        public void close() {
-               
ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), 
TimeUnit.MILLISECONDS, retryExecutorService);
+               if (running.compareAndSet(true, false)) {
+                       
ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), 
TimeUnit.MILLISECONDS, retryExecutorService);
 
-               this.restClient.shutdown(Time.seconds(5));
-               ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.executorService);
+                       this.restClient.shutdown(Time.seconds(5));
+                       ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.executorService);
 
-               try {
-                       webMonitorRetrievalService.stop();
-               } catch (Exception e) {
-                       LOG.error("An error occurred during stopping the 
WebMonitorRetrievalService", e);
-               }
+                       try {
+                               webMonitorRetrievalService.stop();
+                       } catch (Exception e) {
+                               LOG.error("An error occurred during stopping 
the WebMonitorRetrievalService", e);
+                       }
 
-               try {
-                       clientHAServices.close();
-               } catch (Exception e) {
-                       LOG.error("An error occurred during stopping the 
ClientHighAvailabilityServices", e);
+                       try {
+                               clientHAServices.close();
+                       } catch (Exception e) {
+                               LOG.error("An error occurred during stopping 
the ClientHighAvailabilityServices", e);
+                       }
                }
        }
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 20fd3ed..3ed9e96 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -396,7 +396,7 @@ public class ClientTest extends TestLogger {
                                                
jobGraph.setClasspaths(accessor.getClasspaths());
 
                                                final JobID jobID = 
ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
-                                               return 
CompletableFuture.completedFuture(new 
ClusterClientJobClientAdapter<>(clusterClient, jobID, false));
+                                               return 
CompletableFuture.completedFuture(new 
ClusterClientJobClientAdapter<>(clusterClient, jobID));
                                        };
                                }
                        };
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 3a905f6..fd99840 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -41,4 +41,9 @@ public interface JobClient extends AutoCloseable {
         * @param userClassloader the classloader used to de-serialize the 
accumulators of the job.
         */
        CompletableFuture<JobExecutionResult> getJobExecutionResult(final 
ClassLoader userClassloader);
+
+       @Override
+       default void close() {
+
+       }
 }
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index 77b14db..176d50f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -92,11 +92,6 @@ public class ExecutorDiscoveryAndJobClientTest {
                                public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(ClassLoader userClassloader) {
                                        return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, 
Collections.emptyMap()));
                                }
-
-                               @Override
-                               public void close() {
-
-                               }
                        });
                }
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 4f1ac69..6dc8be2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -92,11 +92,6 @@ public class ExecutorDiscoveryAndJobClientTest {
                                public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(ClassLoader userClassloader) {
                                        return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, 
Collections.emptyMap()));
                                }
-
-                               @Override
-                               public void close() {
-
-                               }
                        });
                }
        }

Reply via email to