asfgit closed pull request #6540: [FLINK-9891] Added hook to shutdown cluster
if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a615b..780f8144029 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ protected void run(String[] args) throws Exception {
LOG.info("Could not properly shut down
the client.", e);
}
} else {
+ final Thread shutdownHook;
if (clusterId != null) {
client =
clusterDescriptor.retrieve(clusterId);
+ shutdownHook = null;
} else {
// also in job mode we have to deploy a
session cluster because the job
// might consist of multiple parts
(e.g. when using collect)
final ClusterSpecification
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client =
clusterDescriptor.deploySessionCluster(clusterSpecification);
+ // if not running in detached mode, add
a shutdown hook to shut down cluster if client exits
+ // there's a race-condition here if cli
is killed before shutdown hook is installed
+ if (!runOptions.getDetachedMode()) {
+ shutdownHook =
ShutdownHookUtil.addShutdownHook(client::shutDownCluster,
client.getClass().getSimpleName(), LOG);
+ } else {
+ shutdownHook = null;
+ }
}
try {
@@ -278,12 +288,12 @@ protected void run(String[] args) throws Exception {
executeProgram(program, client,
userParallelism);
} finally {
- if (clusterId == null &&
!client.isDetached()) {
+ if (shutdownHook != null) {
// terminate the cluster only
if we have started it before and if it's not detached
try {
-
client.shutDownCluster();
- } catch (final Exception e) {
- LOG.info("Could not
properly terminate the Flink cluster.", e);
+ shutdownHook.run();
+ } finally {
+
ShutdownHookUtil.removeShutdownHook(shutdownHook,
client.getClass().getSimpleName(), LOG);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services