This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c927e176f35b9db0f9daec7b67ccdb41d8e7d9f5 Author: tison <[email protected]> AuthorDate: Fri Nov 29 09:37:47 2019 +0800 [FLINK-14762][client] Handle clients close gracefully --- .../deployment/AbstractJobClusterExecutor.java | 22 +++++++++++++- .../deployment/AbstractSessionClusterExecutor.java | 7 ++++- .../deployment/ClusterClientJobClientAdapter.java | 35 ++++++++++------------ .../apache/flink/client/program/ClusterClient.java | 2 +- .../client/program/rest/RestClusterClient.java | 29 ++++++++++-------- .../apache/flink/client/program/ClientTest.java | 2 +- .../org/apache/flink/core/execution/JobClient.java | 5 ++++ .../java/ExecutorDiscoveryAndJobClientTest.java | 5 ---- .../ExecutorDiscoveryAndJobClientTest.java | 5 ---- 9 files changed, 67 insertions(+), 45 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java index 0fd03e6..6a18c2f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,26 @@ public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends Cluster LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); final boolean withShutdownHook = !configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit(); - return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(clusterClient, jobGraph.getJobID(), withShutdownHook)); + + if (withShutdownHook) { + Thread shutdownHook = ShutdownHookUtil.addShutdownHook( + clusterClient::shutDownCluster, clusterClient.getClass().getSimpleName(), LOG); + + return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) { + @Override + protected void doClose() { + ShutdownHookUtil.removeShutdownHook(shutdownHook, clusterClient.getClass().getSimpleName(), LOG); + clusterClient.close(); + } + }); + } else { + return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobGraph.getJobID()) { + @Override + protected void doClose() { + clusterClient.close(); + } + }); + } } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java index 65382ba..a1a1cb2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java @@ -61,7 +61,12 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu return clusterClient .submitJob(jobGraph) .thenApply(JobSubmissionResult::getJobID) - .thenApply(jobID -> new ClusterClientJobClientAdapter<>(clusterClient, jobID, false)); + .thenApply(jobID -> new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) { + @Override + protected void doClose() { + clusterClient.close(); + } + }); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index f4a7376..6ba558d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -26,14 +26,11 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.ShutdownHookUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -42,24 +39,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { - private static final Logger LOG = LoggerFactory.getLogger(ClusterClientJobClientAdapter.class); - private final ClusterClient<ClusterID> clusterClient; private final JobID jobID; - private final Thread shutdownHook; + private final AtomicBoolean running = new AtomicBoolean(true); - public ClusterClientJobClientAdapter(final ClusterClient<ClusterID> clusterClient, final JobID jobID, final boolean withShutdownHook) { + public ClusterClientJobClientAdapter(final ClusterClient<ClusterID> clusterClient, final JobID jobID) { this.jobID = checkNotNull(jobID); this.clusterClient = checkNotNull(clusterClient); - - if (withShutdownHook) { - shutdownHook = ShutdownHookUtil.addShutdownHook( - clusterClient::shutDownCluster, clusterClient.getClass().getSimpleName(), LOG); - } else { - shutdownHook = null; - } } @Override @@ -87,10 +75,19 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { } @Override - public void close() throws Exception { - if (shutdownHook != null) { - ShutdownHookUtil.removeShutdownHook(shutdownHook, clusterClient.getClass().getSimpleName(), LOG); + public final void close() { + if (running.compareAndSet(true, false)) { + doClose(); } - this.clusterClient.close(); + } + + /** + * Method to be overridden by subclass which contains actual close actions. + * + * <p>We do close in this way to ensure multiple calls to {@link #close()} + * are executed at most once guarded by {@link #running} flag. + */ + protected void doClose() { + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index ad14072..eadcbe7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -44,7 +44,7 @@ import java.util.concurrent.CompletableFuture; public interface ClusterClient<T> extends AutoCloseable { @Override - default void close() throws Exception { + default void close() { } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index c6e1954..40c2b64 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -119,6 +119,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -150,6 +151,8 @@ public class RestClusterClient<T> implements ClusterClient<T> { private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever(); + private final AtomicBoolean running = new AtomicBoolean(true); + /** ExecutorService to run operations that can be retried on exceptions. */ private ScheduledExecutorService retryExecutorService; @@ -199,21 +202,23 @@ public class RestClusterClient<T> implements ClusterClient<T> { @Override public void close() { - ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService); + if (running.compareAndSet(true, false)) { + ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService); - this.restClient.shutdown(Time.seconds(5)); - ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); + this.restClient.shutdown(Time.seconds(5)); + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); - try { - webMonitorRetrievalService.stop(); - } catch (Exception e) { - LOG.error("An error occurred during stopping the WebMonitorRetrievalService", e); - } + try { + webMonitorRetrievalService.stop(); + } catch (Exception e) { + LOG.error("An error occurred during stopping the WebMonitorRetrievalService", e); + } - try { - clientHAServices.close(); - } catch (Exception e) { - LOG.error("An error occurred during stopping the ClientHighAvailabilityServices", e); + try { + clientHAServices.close(); + } catch (Exception e) { + LOG.error("An error occurred during stopping the ClientHighAvailabilityServices", e); + } } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 20fd3ed..3ed9e96 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -396,7 +396,7 @@ public class ClientTest extends TestLogger { jobGraph.setClasspaths(accessor.getClasspaths()); final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID(); - return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(clusterClient, jobID, false)); + return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(clusterClient, jobID)); }; } }; diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 3a905f6..fd99840 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -41,4 +41,9 @@ public interface JobClient extends AutoCloseable { * @param userClassloader the classloader used to de-serialize the accumulators of the job. */ CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader); + + @Override + default void close() { + + } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java index 77b14db..176d50f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java @@ -92,11 +92,6 @@ public class ExecutorDiscoveryAndJobClientTest { public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap())); } - - @Override - public void close() { - - } }); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java index 4f1ac69..6dc8be2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java @@ -92,11 +92,6 @@ public class ExecutorDiscoveryAndJobClientTest { public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap())); } - - @Override - public void close() { - - } }); } }
