Repository: flink
Updated Branches:
  refs/heads/master af1e03e13 -> b489c3673


[FLINK-2825] FlinkClient.killTopology fails due to missing leader session ID


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b489c367
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b489c367
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b489c367

Branch: refs/heads/master
Commit: b489c367385453404671d0136ea59a9e63434ea7
Parents: af1e03e
Author: mjsax <[email protected]>
Authored: Tue Oct 6 23:27:45 2015 +0200
Committer: mjsax <[email protected]>
Committed: Tue Oct 6 23:27:45 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/storm/api/FlinkClient.java | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b489c367/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 5f0ee21..b19cb38 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 
@@ -211,31 +210,34 @@ public class FlinkClient {
        public void killTopologyWithOpts(final String name, final KillOptions 
options) throws NotAliveException {
                final JobID jobId = this.getTopologyJobId(name);
                if (jobId == null) {
-                       throw new NotAliveException();
+                       throw new NotAliveException("Storm topology with name " 
+ name + " not found.");
                }
 
-               try {
-                       final ActorRef jobManager = this.getJobManager();
-
-                       if (options != null) {
-                               try {
-                                       Thread.sleep(1000 * 
options.get_wait_secs());
-                               } catch (final InterruptedException e) {
-                                       throw new RuntimeException(e);
-                               }
-                       }
-
-                       final FiniteDuration askTimeout = this.getTimeout();
-                       final Future<Object> response = 
Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+               if (options != null) {
                        try {
-                               Await.result(response, askTimeout);
-                       } catch (final Exception e) {
-                               throw new RuntimeException("Killing topology " 
+ name + " with Flink job ID " + jobId + " failed", e);
+                               Thread.sleep(1000 * options.get_wait_secs());
+                       } catch (final InterruptedException e) {
+                               throw new RuntimeException(e);
                        }
+               }
+
+               final Configuration configuration = 
GlobalConfiguration.getConfiguration();
+               
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
this.jobManagerHost);
+               
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
this.jobManagerPort);
+
+               final Client client;
+               try {
+                       client = new Client(configuration);
                } catch (final IOException e) {
-                       throw new RuntimeException("Could not connect to Flink 
JobManager with address " + this.jobManagerHost
-                                       + ":" + this.jobManagerPort, e);
+                       throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }
+
+               try {
+                       client.cancel(jobId);
+               } catch (final Exception e) {
+                       throw new RuntimeException("Cannot stop job.", e);
+               }
+
        }
 
        // Flink specific additional methods

Reply via email to