asfgit closed pull request #6540: [FLINK-9891] Added hook to shutdown cluster 
if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a615b..780f8144029 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ protected void run(String[] args) throws Exception {
                                        LOG.info("Could not properly shut down 
the client.", e);
                                }
                        } else {
+                               final Thread shutdownHook;
                                if (clusterId != null) {
                                        client = 
clusterDescriptor.retrieve(clusterId);
+                                       shutdownHook = null;
                                } else {
                                        // also in job mode we have to deploy a 
session cluster because the job
                                        // might consist of multiple parts 
(e.g. when using collect)
                                        final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
                                        client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+                                       // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
+                                       // there's a race-condition here if cli 
is killed before shutdown hook is installed
+                                       if (!runOptions.getDetachedMode()) {
+                                               shutdownHook = 
ShutdownHookUtil.addShutdownHook(client::shutDownCluster, 
client.getClass().getSimpleName(), LOG);
+                                       } else {
+                                               shutdownHook = null;
+                                       }
                                }
 
                                try {
@@ -278,12 +288,12 @@ protected void run(String[] args) throws Exception {
 
                                        executeProgram(program, client, 
userParallelism);
                                } finally {
-                                       if (clusterId == null && 
!client.isDetached()) {
+                                       if (shutdownHook != null) {
                                                // terminate the cluster only 
if we have started it before and if it's not detached
                                                try {
-                                                       
client.shutDownCluster();
-                                               } catch (final Exception e) {
-                                                       LOG.info("Could not 
properly terminate the Flink cluster.", e);
+                                                       shutdownHook.run();
+                                               } finally {
+                                                       
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
client.getClass().getSimpleName(), LOG);
                                                }
                                        }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to