[
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15027162#comment-15027162
]
ASF GitHub Bot commented on FLINK-2837:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1398#discussion_r45892911
--- Diff:
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
---
@@ -15,75 +16,474 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.storm.api;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink
program.
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not
supported.</strong>
*/
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+ /** All declared streams and output schemas by operator ID */
+ private final HashMap<String, HashMap<String, Fields>> outputStreams =
new HashMap<String, HashMap<String, Fields>>();
+ /** All spouts&bolts declarers by their ID */
+ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers =
new HashMap<String, FlinkOutputFieldsDeclarer>();
+
+ private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>
unprocessdInputsPerBolt =
+ new HashMap<String, Set<Entry<GlobalStreamId,
Grouping>>>();
+
+ final HashMap<String, HashMap<String, DataStream<Tuple>>>
availableInputs = new HashMap<>();
- /** The number of declared tasks for the whole program (ie, sum over
all dops) */
- private int numberOfTasks = 0;
+ private final TopologyBuilder builder;
- public FlinkTopology() {
- // Set default parallelism to 1, to mirror Storm default
behavior
- super.setParallelism(1);
+ // needs to be a class member for internal testing purpose
+ private final StormTopology stormTopology;
+
+ private final Map<String, IRichSpout> spouts;
+ private final Map<String, IRichBolt> bolts;
+
+ private final StreamExecutionEnvironment env;
+
+ private FlinkTopology(TopologyBuilder builder) {
+ this.builder = builder;
+ this.stormTopology = builder.createTopology();
+ // extract the spouts and bolts
+ this.spouts = getPrivateField("_spouts");
+ this.bolts = getPrivateField("_bolts");
+
+ this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Kick off the translation immediately
+ translateTopology();
}
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster},
{@link FlinkSubmitter}, or {@link
- * FlinkClient}.
*
- * @throws UnsupportedOperationException
- * at every invocation
+ * Creates a Flink program that uses the specified spouts and bolts.
+ * @param stormBuilder The storm topology builder to use for creating
the Flink topology.
+ * @return A Flink Topology which may be executed.
*/
- @Override
- public JobExecutionResult execute() throws Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly.
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
+ public static FlinkTopology createTopology(TopologyBuilder
stormBuilder) {
+ return new FlinkTopology(stormBuilder);
}
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster},
{@link FlinkSubmitter} or {@link
- * FlinkClient}.
- *
- * @throws UnsupportedOperationException
- * at every invocation
+ * Returns the underlying Flink ExecutionEnvironment for the Storm
topology.
+ * @return The contextual environment.
*/
- @Override
- public JobExecutionResult execute(final String jobName) throws
Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly.
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return this.env;
}
/**
- * Increased the number of declared tasks of this program by the given
value.
- *
- * @param dop
- * The dop of a new operator that increases the number of
overall tasks.
+ * Directly executes the Storm topology based on the current context
(local when in IDE and
+ * remote when executed thorugh ./bin/flink).
+ * @return The execution result
+ * @throws Exception
*/
--- End diff --
Yes, execute just throws an Exception. Nothing to explain here.
StreamExecutionEnvironment says
> * @throws Exception which occurs during job execution.
> FlinkTopologyBuilder cannot handle multiple input streams
> ---------------------------------------------------------
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
> Issue Type: Bug
> Components: Storm Compatibility
> Reporter: Matthias J. Sax
> Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead
> of union the incoming streams, it replicates the consuming bolt and each
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
> .shuffleGrouping(spoutId1)
> .shuffleGrouping(spoutId2)
> .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
> .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)