Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/573#discussion_r28344339
--- Diff:
flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+/**
+ * {@link FlinkSubmitter} mimics a {@link StromSubmitter} to submit Storm
topologies to a Flink cluster.
+ *
+ * @author mjsax
+ */
+public class FlinkSubmitter {
+ public static Logger logger =
LoggerFactory.getLogger(FlinkSubmitter.class);
+
+ private static Nimbus.Iface localNimbus = null;
+
+ public static void setLocalNimbus(final Nimbus.Iface
localNimbusHandler) {
+ FlinkSubmitter.localNimbus = localNimbusHandler;
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or
until explicitly killed.
+ *
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ public static void submitTopology(final String name, final Map<?, ?>
stormConf, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology, (SubmitOptions)null,
(FlinkProgressListener)null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or
until explicitly killed.
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @param opts
+ * to manipulate the starting of the topology.
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ public static void submitTopology(final String name, final Map<?, ?>
stormConf, final FlinkTopology topology, final SubmitOptions opts)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology, opts,
(FlinkProgressListener)null);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or
until explicitly killed. The given
+ * {@link FlinkProgressListener} is ignored because progress bars are
not supported by Flink.
+ *
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @param opts
+ * to manipulate the starting of the topology
+ * @param progressListener
+ * to track the progress of the jar upload process
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static void submitTopology(final String name, final Map
stormConf, final FlinkTopology topology, final SubmitOptions opts, final
FlinkProgressListener progressListener)
+ throws AlreadyAliveException, InvalidTopologyException {
+ if(!Utils.isValidConf(stormConf)) {
+ throw new IllegalArgumentException("Storm conf is not
valid. Must be json-serializable");
+ }
+
+ final HashMap localStormConf = new HashMap(stormConf);
+ localStormConf.putAll(Utils.readCommandLineOpts());
+
+ final Map conf = Utils.readStormConfig();
+ conf.putAll(localStormConf);
+
+ final String serConf = JSONValue.toJSONString(localStormConf);
+
+ if(localNimbus != null) {
+ // TODO: check what is the difference about localNimbus
vs LocalCluster
+ // LOG.info("Submitting topology " + name + " in local
mode");
--- End diff --
I ask at the dev-storm mailing list and got the following answer:
"LocalNimbus is something that is not really used. It provides a way for
the StormSubmitter to be used when submitting a topology to a local mode
cluster. No one uses it and in the past it didn't work, not sure if it works
now or not. It is probably best to just not use it unless you really need to
abstract out local mode from the topology code used to submit a topology.
- Bobby"
We can ignore this case, and I will remove the code.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---