Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/750#discussion_r37988485
--- Diff:
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
---
@@ -205,30 +205,32 @@ public void killTopology(final String name) throws
NotAliveException {
public void killTopologyWithOpts(final String name, final KillOptions
options) throws NotAliveException {
final JobID jobId = this.getTopologyJobId(name);
if (jobId == null) {
- throw new NotAliveException();
+ throw new NotAliveException("Storm topology with name "
+ name + " not found.");
}
- try {
- final ActorRef jobManager = this.getJobManager();
-
- if (options != null) {
- try {
- Thread.sleep(1000 *
options.get_wait_secs());
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- final FiniteDuration askTimeout = this.getTimeout();
- final Future<Object> response =
Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+ if (options != null) {
try {
- Await.result(response, askTimeout);
- } catch (final Exception e) {
- throw new RuntimeException("Killing topology "
+ name + " with Flink job ID " + jobId + " failed", e);
+ Thread.sleep(1000 * options.get_wait_secs());
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
}
- } catch (final IOException e) {
- throw new RuntimeException("Could not connect to Flink
JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
+ }
+
+ final Configuration configuration =
GlobalConfiguration.getConfiguration();
+
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
this.jobManagerHost);
+
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
this.jobManagerPort);
+
+ final Client client;
+ try {
+ client = new Client(configuration,
JobWithJars.class.getClassLoader());
+ } catch (final UnknownHostException e) {
+ throw new RuntimeException("Cannot execute job due to
UnknownHostException", e);
+ }
+
+ try {
+ client.stop(jobId);
+ } catch (final ProgramStopException e) {
+ throw new RuntimeException("Cannot execute job due to
ProgramInvocationException", e);
--- End diff --
You've received a `ProgramStopException` and not a
`ProgramInvocationException`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---