This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd521c1665ae0e35ec985f1c45eceaf0c729bb87
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Sat Dec 7 12:04:24 2019 +0100

    [FLINK-15116] Move attached shutdown hook to context environment
    
    This simplifies the per-job executor and moves the logic to where it
    actually belongs. The previous solution had some issues that this fixes:
    
     - the code that knows how and why we're submitting a job is the context
     environment because it is used for the Flink CLI
     - having the code in the one executor made that one more complicated
     for a CLI feature
     - the shutdown hook was not available for jobs submitted against an
     existing session. I think the expected behaviour is that an attached
     job that is submitted from the CLI shuts down no matter the execution
     mode.
    
    The "shutdown on attached exit" parameter now has the semantics: "cancel
    job when the CLI shuts down". For a per-job cluster this has the same
    effect as before: the cluster shuts down. For sesion clusters this has
    the effect that the job is canceled.
    
    For this to work we have to change the MiniDispatcher to shut down the
    cluster when a job is canceled.
---
 .../deployment/AbstractJobClusterExecutor.java     | 23 ++-----------
 .../flink/client/program/ContextEnvironment.java   | 35 ++++++++++++++++++-
 .../flink/runtime/dispatcher/MiniDispatcher.java   | 17 ++++++++--
 .../api/environment/StreamContextEnvironment.java  | 39 +++++++++++++++++++++-
 4 files changed, 89 insertions(+), 25 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index f564f07..460c22a 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -21,13 +21,11 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,25 +66,8 @@ public class AbstractJobClusterExecutor<ClusterID, 
ClientFactory extends Cluster
                                        .deployJobCluster(clusterSpecification, 
jobGraph, configAccessor.getDetachedMode());
                        LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
 
-                       final boolean withShutdownHook = 
!configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit();
-
-                       if (withShutdownHook) {
-                               ShutdownHookUtil.addShutdownHook(
-                                               () -> {
-                                                       try 
(ClusterClient<ClusterID> client = clusterClientProvider.getClusterClient()) {
-                                                               
client.shutDownCluster();
-                                                       }
-                                               },
-                                               "Cluster shutdown hook for 
attached Job execution",
-                                               LOG);
-
-                               return CompletableFuture.completedFuture(
-                                               new 
ClusterClientJobClientAdapter<ClusterID>(clusterClientProvider, 
jobGraph.getJobID()) {
-                               });
-                       } else {
-                               return CompletableFuture.completedFuture(
-                                               new 
ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
-                       }
+                       return CompletableFuture.completedFuture(
+                                       new 
ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
                }
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 7140dec..15ce9b2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -27,7 +27,14 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.util.ShutdownHookUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -37,6 +44,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionEnvironment.class);
+
        private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
        private boolean alreadyCalled;
@@ -61,7 +70,31 @@ public class ContextEnvironment extends ExecutionEnvironment 
{
        public JobExecutionResult execute(String jobName) throws Exception {
                verifyExecuteIsCalledOnceWhenInDetachedMode();
 
-               final JobExecutionResult jobExecutionResult = 
super.execute(jobName);
+               JobClient jobClient = executeAsync(jobName).get();
+
+               JobExecutionResult jobExecutionResult;
+               if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
+                       CompletableFuture<JobExecutionResult> 
jobExecutionResultFuture =
+                                       
jobClient.getJobExecutionResult(getUserCodeClassLoader());
+
+                       if 
(getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+                               Thread shutdownHook = 
ShutdownHookUtil.addShutdownHook(
+                                               () -> {
+                                                       // wait a smidgen to 
allow the async request to go through before
+                                                       // the jvm exits
+                                                       
jobClient.cancel().get(1, TimeUnit.SECONDS);
+                                               },
+                                               
ContextEnvironment.class.getSimpleName(),
+                                               LOG);
+                               jobExecutionResultFuture.whenComplete((ignored, 
throwable) ->
+                                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
ContextEnvironment.class.getSimpleName(), LOG));
+                       }
+
+                       jobExecutionResult = jobExecutionResultFuture.get();
+               } else {
+                       jobExecutionResult = new 
DetachedJobExecutionResult(jobClient.getJobID());
+               }
+
                setJobExecutionResult(jobExecutionResult);
                return jobExecutionResult;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 37b8799..f682cc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -94,17 +94,30 @@ public class MiniDispatcher extends Dispatcher {
                                ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
                                                ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;
 
-                               LOG.debug("Shutting down cluster because 
someone retrieved the job result.");
+                               LOG.debug("Shutting down per-job cluster 
because someone retrieved the job result.");
                                shutDownFuture.complete(status);
                        });
                } else {
-                       LOG.debug("Not shutting down cluster after someone 
retrieved the job result.");
+                       LOG.debug("Not shutting down per-job cluster after 
someone retrieved the job result.");
                }
 
                return jobResultFuture;
        }
 
        @Override
+       public CompletableFuture<Acknowledge> cancelJob(
+                       JobID jobId, Time timeout) {
+               CompletableFuture<Acknowledge> cancelFuture = 
super.cancelJob(jobId, timeout);
+
+               cancelFuture.thenAccept((ignored) -> {
+                       LOG.debug("Shutting down per-job cluster because the 
job was canceled.");
+                       shutDownFuture.complete(ApplicationStatus.CANCELED);
+               });
+
+               return cancelFuture;
+       }
+
+       @Override
        protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
                super.jobReachedGloballyTerminalState(archivedExecutionGraph);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 62960b4..23d7c78 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,8 +19,19 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.DetachedJobExecutionResult;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -32,6 +43,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionEnvironment.class);
+
        private final ContextEnvironment ctx;
 
        StreamContextEnvironment(final ContextEnvironment ctx) {
@@ -49,7 +62,31 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
                transformations.clear();
 
-               final JobExecutionResult jobExecutionResult = 
super.execute(streamGraph);
+               JobClient jobClient = executeAsync(streamGraph).get();
+
+               JobExecutionResult jobExecutionResult;
+               if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
+                       CompletableFuture<JobExecutionResult> 
jobExecutionResultFuture =
+                                       
jobClient.getJobExecutionResult(ctx.getUserCodeClassLoader());
+
+                       if 
(getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
+                               Thread shutdownHook = 
ShutdownHookUtil.addShutdownHook(
+                                               () -> {
+                                                       // wait a smidgen to 
allow the async request to go through before
+                                                       // the jvm exits
+                                                       
jobClient.cancel().get(1, TimeUnit.SECONDS);
+                                               },
+                                               
ContextEnvironment.class.getSimpleName(),
+                                               LOG);
+                               jobExecutionResultFuture.whenComplete((ignored, 
throwable) ->
+                                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
ContextEnvironment.class.getSimpleName(), LOG));
+                       }
+
+                       jobExecutionResult = jobExecutionResultFuture.get();
+               } else {
+                       jobExecutionResult = new 
DetachedJobExecutionResult(jobClient.getJobID());
+               }
+
                ctx.setJobExecutionResult(jobExecutionResult);
                return jobExecutionResult;
        }

Reply via email to