Repository: flink Updated Branches: refs/heads/master aa6a7f04c -> 0c538915b
[FLINK-1818] add cancel method to Client This closes #642. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c538915 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c538915 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c538915 Branch: refs/heads/master Commit: 0c538915b23d9ad4e410036c9ac18c3e928ce251 Parents: aa6a7f0 Author: rainiraj <raini...@gmail.com> Authored: Thu Apr 30 08:28:29 2015 -0700 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Jul 22 20:16:59 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 43 ++++++++++++++++++++ 1 file changed, 43 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0c538915/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index c544e8d..2908f92 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -27,6 +27,8 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.List; +import akka.pattern.Patterns; +import akka.util.Timeout; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.configuration.IllegalConfigurationException; @@ -52,9 +54,12 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -421,6 +426,44 @@ public class Client { } } + /** + * Cancels a job identified by the job id. + * @param jobId the job id + * @throws Exception In case an error occurred. + */ + public void cancel(JobID jobId) throws Exception { + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + ActorSystem actorSystem; + try { + actorSystem = JobClient.startJobClientActorSystem(configuration); + } catch (Exception e) { + throw new ProgramInvocationException("Could start client actor system.", e); + } + + ActorRef jobManager; + try { + jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout); + } catch (Exception e) { + LOG.error("Error in getting the remote reference for the job manager", e); + throw new ProgramInvocationException("Failed to resolve JobManager", e); + } + + Future<Object> response = Patterns.ask(jobManager, new JobManagerMessages.CancelJob(jobId), new Timeout(timeout)); + Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.CancellationSuccess) { + LOG.debug("Job cancellation with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.CancellationFailure) { + Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); + LOG.debug("Job cancellation with ID " + jobId + " failed.", t); + throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); + } else { + throw new Exception("Unknown message received while cancelling."); + } + } + + // -------------------------------------------------------------------------------------------- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {