[
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14906511#comment-14906511
]
ASF GitHub Bot commented on FLINK-2111:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/750#discussion_r40333992
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -452,6 +453,73 @@ public void cancel(JobID jobId) throws Exception {
}
}
+ /**
+ * Stops a program on Flink cluster whose job-manager is configured in
this client's configuration.
+ * Stopping works only for streaming programs. Be aware, that the
program might continue to run for
+ * a while after sending the stop command, because after sources
stopped to emit data all operators
+ * need to finish processing.
+ *
+ * @param jobId
+ * the job ID of the streaming program to stop
+ * @throws ProgramStopException
+ * If the job ID is invalid (ie, is unknown or refers to a
batch job) or if sending the stop signal
+ * failed. That might be due to an I/O problem, ie, the
job-manager is unreachable.
+ */
+ public void stop(JobID jobId) throws Exception {
+ final FiniteDuration timeout =
AkkaUtils.getTimeout(configuration);
+ final FiniteDuration lookupTimeout =
AkkaUtils.getLookupTimeout(configuration);
+
+ ActorSystem actorSystem;
+ try {
+ actorSystem =
JobClient.startJobClientActorSystem(configuration);
+ } catch (Exception e) {
+ throw new ProgramStopException("Could not start client
actor system.", e);
+ }
+
+ try {
+ ActorGateway jobManagerGateway;
+
+ LeaderRetrievalService leaderRetrievalService;
+
+ try {
+ leaderRetrievalService =
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+ } catch (Exception e) {
+ throw new ProgramInvocationException("Could not
create the leader retrieval service.", e);
+ }
+
+ try {
+ jobManagerGateway =
LeaderRetrievalUtils.retrieveLeaderGateway(
+ leaderRetrievalService,
+ actorSystem,
+ lookupTimeout);
+ } catch (LeaderRetrievalException e) {
+ throw new ProgramInvocationException("Failed to
retrieve JobManager gateway", e);
+ }
+
+ Future<Object> response;
+ try {
+ response = jobManagerGateway.ask(new
StopJob(jobId), timeout);
+ } catch (Exception e) {
+ throw new ProgramInvocationException("Failed to
query the job manager gateway.", e);
+ }
+
+ Object result = Await.result(response, timeout);
+
+ if (result instanceof
JobManagerMessages.StoppingSuccess) {
+ LOG.debug("Job stopping with ID " + jobId + "
succeeded.");
--- End diff --
This should be logged on INFO level...
> Add "stop" signal to cleanly shutdown streaming jobs
> ----------------------------------------------------
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime, JobManager, Local Runtime,
> Streaming, TaskManager, Webfrontend
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks
> such that the sources can stop emitting data and shutdown cleanly, resulting
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for
> https://issues.apache.org/jira/browse/FLINK-1929
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)