Repository: flink
Updated Branches:
  refs/heads/master aa6a7f04c -> 0c538915b


[FLINK-1818] add cancel method to Client

This closes #642.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c538915
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c538915
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c538915

Branch: refs/heads/master
Commit: 0c538915b23d9ad4e410036c9ac18c3e928ce251
Parents: aa6a7f0
Author: rainiraj <raini...@gmail.com>
Authored: Thu Apr 30 08:28:29 2015 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Jul 22 20:16:59 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java | 43 ++++++++++++++++++++
 1 file changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c538915/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c544e8d..2908f92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -27,6 +27,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -52,9 +54,12 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -421,6 +426,44 @@ public class Client {
                }
        }
 
+       /**
+        * Cancels a job identified by the job id.
+        * @param jobId the job id
+        * @throws Exception In case an error occurred.
+        */
+       public void cancel(JobID jobId) throws Exception {
+               final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+
+               ActorSystem actorSystem;
+               try {
+                       actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+               } catch (Exception e) {
+                       throw new ProgramInvocationException("Could start 
client actor system.", e);
+               }
+
+               ActorRef jobManager;
+               try {
+                       jobManager = 
JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, 
timeout);
+               } catch (Exception e) {
+                       LOG.error("Error in getting the remote reference for 
the job manager", e);
+                       throw new ProgramInvocationException("Failed to resolve 
JobManager", e);
+               }
+
+               Future<Object> response = Patterns.ask(jobManager, new 
JobManagerMessages.CancelJob(jobId), new Timeout(timeout));
+               Object result = Await.result(response, timeout);
+
+               if (result instanceof JobManagerMessages.CancellationSuccess) {
+                       LOG.debug("Job cancellation with ID " + jobId + " 
succeeded.");
+               } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
+                       Throwable t = ((JobManagerMessages.CancellationFailure) 
result).cause();
+                       LOG.debug("Job cancellation with ID " + jobId + " 
failed.", t);
+                       throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
+               } else {
+                       throw new Exception("Unknown message received while 
cancelling.");
+               }
+       }
+
+
        // 
--------------------------------------------------------------------------------------------
        
        public static final class OptimizerPlanEnvironment extends 
ExecutionEnvironment {

Reply via email to