This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd521c1665ae0e35ec985f1c45eceaf0c729bb87 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Sat Dec 7 12:04:24 2019 +0100 [FLINK-15116] Move attached shutdown hook to context environment This simplifies the per-job executor and moves the logic to where it actually belongs. The previous solution had some issues that this fixes: - the code that knows how and why we're submitting a job is the context environment because it is used for the Flink CLI - having the code in the one executor made that one more complicated for a CLI feature - the shutdown hook was not available for jobs submitted against an existing session. I think the expected behaviour is that an attached job that is submitted from the CLI shuts down no matter the execution mode. The "shutdown on attached exit" parameter now has the semantics: "cancel job when the CLI shuts down". For a per-job cluster this has the same effect as before: the cluster shuts down. For sesion clusters this has the effect that the job is canceled. For this to work we have to change the MiniDispatcher to shut down the cluster when a job is canceled. --- .../deployment/AbstractJobClusterExecutor.java | 23 ++----------- .../flink/client/program/ContextEnvironment.java | 35 ++++++++++++++++++- .../flink/runtime/dispatcher/MiniDispatcher.java | 17 ++++++++-- .../api/environment/StreamContextEnvironment.java | 39 +++++++++++++++++++++- 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java index f564f07..460c22a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -21,13 +21,11 @@ package org.apache.flink.client.deployment; import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,25 +66,8 @@ public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends Cluster .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); - final boolean withShutdownHook = !configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit(); - - if (withShutdownHook) { - ShutdownHookUtil.addShutdownHook( - () -> { - try (ClusterClient<ClusterID> client = clusterClientProvider.getClusterClient()) { - client.shutDownCluster(); - } - }, - "Cluster shutdown hook for attached Job execution", - LOG); - - return CompletableFuture.completedFuture( - new ClusterClientJobClientAdapter<ClusterID>(clusterClientProvider, jobGraph.getJobID()) { - }); - } else { - return CompletableFuture.completedFuture( - new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID())); - } + return CompletableFuture.completedFuture( + new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID())); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 7140dec..15ce9b2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -27,7 +27,14 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.DetachedJobExecutionResult; import org.apache.flink.core.execution.ExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.util.ShutdownHookUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -37,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ContextEnvironment extends ExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); + private final AtomicReference<JobExecutionResult> jobExecutionResult; private boolean alreadyCalled; @@ -61,7 +70,31 @@ public class ContextEnvironment extends ExecutionEnvironment { public JobExecutionResult execute(String jobName) throws Exception { verifyExecuteIsCalledOnceWhenInDetachedMode(); - final JobExecutionResult jobExecutionResult = super.execute(jobName); + JobClient jobClient = executeAsync(jobName).get(); + + JobExecutionResult jobExecutionResult; + if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) { + CompletableFuture<JobExecutionResult> jobExecutionResultFuture = + jobClient.getJobExecutionResult(getUserCodeClassLoader()); + + if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + Thread shutdownHook = ShutdownHookUtil.addShutdownHook( + () -> { + // wait a smidgen to allow the async request to go through before + // the jvm exits + jobClient.cancel().get(1, TimeUnit.SECONDS); + }, + ContextEnvironment.class.getSimpleName(), + LOG); + jobExecutionResultFuture.whenComplete((ignored, throwable) -> + ShutdownHookUtil.removeShutdownHook(shutdownHook, ContextEnvironment.class.getSimpleName(), LOG)); + } + + jobExecutionResult = jobExecutionResultFuture.get(); + } else { + jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); + } + setJobExecutionResult(jobExecutionResult); return jobExecutionResult; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 37b8799..f682cc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -94,17 +94,30 @@ public class MiniDispatcher extends Dispatcher { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; - LOG.debug("Shutting down cluster because someone retrieved the job result."); + LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } else { - LOG.debug("Not shutting down cluster after someone retrieved the job result."); + LOG.debug("Not shutting down per-job cluster after someone retrieved the job result."); } return jobResultFuture; } @Override + public CompletableFuture<Acknowledge> cancelJob( + JobID jobId, Time timeout) { + CompletableFuture<Acknowledge> cancelFuture = super.cancelJob(jobId, timeout); + + cancelFuture.thenAccept((ignored) -> { + LOG.debug("Shutting down per-job cluster because the job was canceled."); + shutDownFuture.complete(ApplicationStatus.CANCELED); + }); + + return cancelFuture; + } + + @Override protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) { super.jobReachedGloballyTerminalState(archivedExecutionGraph); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 62960b4..23d7c78 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -19,8 +19,19 @@ package org.apache.flink.streaming.api.environment; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.client.program.ContextEnvironment; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.DetachedJobExecutionResult; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -32,6 +43,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @PublicEvolving public class StreamContextEnvironment extends StreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); + private final ContextEnvironment ctx; StreamContextEnvironment(final ContextEnvironment ctx) { @@ -49,7 +62,31 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { transformations.clear(); - final JobExecutionResult jobExecutionResult = super.execute(streamGraph); + JobClient jobClient = executeAsync(streamGraph).get(); + + JobExecutionResult jobExecutionResult; + if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) { + CompletableFuture<JobExecutionResult> jobExecutionResultFuture = + jobClient.getJobExecutionResult(ctx.getUserCodeClassLoader()); + + if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + Thread shutdownHook = ShutdownHookUtil.addShutdownHook( + () -> { + // wait a smidgen to allow the async request to go through before + // the jvm exits + jobClient.cancel().get(1, TimeUnit.SECONDS); + }, + ContextEnvironment.class.getSimpleName(), + LOG); + jobExecutionResultFuture.whenComplete((ignored, throwable) -> + ShutdownHookUtil.removeShutdownHook(shutdownHook, ContextEnvironment.class.getSimpleName(), LOG)); + } + + jobExecutionResult = jobExecutionResultFuture.get(); + } else { + jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID()); + } + ctx.setJobExecutionResult(jobExecutionResult); return jobExecutionResult; }