Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/573#discussion_r30113961
--- Diff:
flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import
org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link
Nimbus}{@code .Client} at once, to interact with
+ * Flink's JobManager instead of Storm's Nimbus.
+ */
+public class FlinkClient {
+ /**
+ * The jobmanager's host name.
+ */
+ private final String jobManagerHost;
+ /**
+ * The jobmanager's rpc port.
+ */
+ private final int jobManagerPort;
+ /**
+ * The user specified timeout in milliseconds.
+ */
+ private final String timeout;
+
+
+
+ /*
+ * The following methods are derived from
"backtype.storm.utils.NimbusClient"
+ */
+
+ /**
+ * Instantiates a new {@link FlinkClient} for the given configuration,
host name, and port. If values for
+ * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of
the given configuration are ignored.
+ *
+ * @param conf
+ * A configuration.
+ * @param host
+ * The jobmanager's host name.
+ * @param port
+ * The jobmanager's rpc port.
+ */
+ public FlinkClient(final Map<?, ?> conf, final String host, final int
port) {
+ this(conf, host, port, null);
+ }
+
+ /**
+ * Instantiates a new {@link FlinkClient} for the given configuration,
host name, and port. If values for
+ * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of
the given configuration are ignored.
+ *
+ * @param conf
+ * A configuration.
+ * @param host
+ * The jobmanager's host name.
+ * @param port
+ * The jobmanager's rpc port.
+ * @param timeout
+ */
+ public FlinkClient(final Map<?, ?> conf, final String host, final int
port, final Integer timeout) {
+ this.jobManagerHost = host;
+ this.jobManagerPort = port;
+ if(timeout != null) {
+ this.timeout = timeout + " ms";
+ } else {
+ this.timeout = null;
+ }
+ }
+
+
+
+ /**
+ * Returns a {@link FlinkClient} that uses the configured {@link
Config#NIMBUS_HOST} and
+ * {@link Config#NIMBUS_THRIFT_PORT} as JobManager address.
+ *
+ * @param conf
+ * Configuration that contains the jobmanager's hostname and
port.
+ *
+ * @return A configured {@link FlinkClient}.
+ */
+ public static FlinkClient
getConfiguredClient(@SuppressWarnings("rawtypes") final Map conf) {
+ final String nimbusHost = (String)conf.get(Config.NIMBUS_HOST);
+ final int nimbusPort =
Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
+ return new FlinkClient(conf, nimbusHost, nimbusPort);
+ }
+
+
+
+ /**
+ * Return a reference to itself.
+ *
+ * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link
Nimbus}{@code .Client}, at once.
+ *
+ * @return A reference to itself.
+ */
+ public FlinkClient getClient() {
+ return this;
+ }
+
+ public void close() {/* nothing to do */}
+
+
+ /*
+ * The following methods are derived from
"backtype.storm.generated.Nimubs.Client"
+ */
+
+ /**
+ * Parameter {@code uploadedJarLocation} is actually used to point to
the local jar, because Flink does not support
+ * uploading a jar file before hand. Jar files are always uploaded
directly when a program is submitted.
+ */
+ public void submitTopology(final String name, final String
uploadedJarLocation, final String jsonConf, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ this.submitTopologyWithOpts(name, uploadedJarLocation,
jsonConf, topology, null);
+ }
+
+ /**
+ * Parameter {@code uploadedJarLocation} is actually used to point to
the local jar, because Flink does not support
+ * uploading a jar file before hand. Jar files are always uploaded
directly when a program is submitted.
+ */
+ public void submitTopologyWithOpts(final String name, final String
uploadedJarLocation, final String jsonConf, final FlinkTopology topology, final
SubmitOptions options)
+ throws AlreadyAliveException, InvalidTopologyException {
+
+ if(this.getTopologyJobId(name) != null) {
+ throw new AlreadyAliveException();
+ }
+
+ final File uploadedJarFile = new File(uploadedJarLocation);
+ try {
+ JobWithJars.checkJarFile(uploadedJarFile);
+ } catch(final IOException e) {
+ throw new RuntimeException("Problem with jar file " +
uploadedJarFile.getAbsolutePath(), e);
+ }
+
+ final List<File> jarFiles = new ArrayList<File>();
+ jarFiles.add(uploadedJarFile);
+
+ final JobGraph jobGraph =
topology.getStreamGraph().getJobGraph(name);
+ jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+
+ final Configuration configuration =
jobGraph.getJobConfiguration();
+
+ final Client client;
+ try {
+ client = new Client(new
InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
+ JobWithJars.buildUserCodeClassLoader(jarFiles,
JobWithJars.class.getClassLoader()), -1);
+ } catch(final UnknownHostException e) {
+ throw new RuntimeException("Cannot execute job due to
UnknownHostException", e);
+ }
+
+ try {
+ client.run(jobGraph, false);
+ } catch(final ProgramInvocationException e) {
+ throw new RuntimeException("Cannot execute job due to
ProgramInvocationException", e);
+ }
+ }
+
+ public void killTopology(final String name) throws NotAliveException {
+ this.killTopologyWithOpts(name, null);
+ }
+
+ public void killTopologyWithOpts(final String name, final KillOptions
options) throws NotAliveException {
+ final JobID jobId = this.getTopologyJobId(name);
+ if(jobId == null) {
+ throw new NotAliveException();
+ }
+
+ try {
+ final ActorRef jobManager = this.getJobManager();
+
+ if(options != null) {
+ try {
+ Thread.sleep(1000 *
options.get_wait_secs());
+ } catch(final InterruptedException e) {
+ e.printStackTrace();
--- End diff --
rethrow as runtime e.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---