[
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15024346#comment-15024346
]
ASF GitHub Bot commented on FLINK-2837:
---------------------------------------
Github user mjsax commented on a diff in the pull request:
https://github.com/apache/flink/pull/1398#discussion_r45726600
--- Diff:
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
---
@@ -15,75 +16,468 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.flink.storm.api;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented
in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink
program.
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not
supported.</strong>
*/
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+ /** All declared streams and output schemas by operator ID */
+ private final HashMap<String, HashMap<String, Fields>> outputStreams =
new HashMap<String, HashMap<String, Fields>>();
+ /** All spouts&bolts declarers by their ID */
+ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers =
new HashMap<String, FlinkOutputFieldsDeclarer>();
+
+ private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>
unprocessdInputsPerBolt =
+ new HashMap<String, Set<Entry<GlobalStreamId,
Grouping>>>();
+
+ final HashMap<String, HashMap<String, DataStream<Tuple>>>
availableInputs = new HashMap<>();
- /** The number of declared tasks for the whole program (ie, sum over
all dops) */
- private int numberOfTasks = 0;
+ private final TopologyBuilder builder;
- public FlinkTopology() {
- // Set default parallelism to 1, to mirror Storm default
behavior
- super.setParallelism(1);
+ // needs to be a class member for internal testing purpose
+ private final StormTopology stormTopology;
+
+ private final Map<String, IRichSpout> spouts;
+ private final Map<String, IRichBolt> bolts;
+
+ private final StreamExecutionEnvironment env;
+
+ private FlinkTopology(TopologyBuilder builder) {
+ this.builder = builder;
+ this.stormTopology = builder.createTopology();
+ // extract the spouts and bolts
+ this.spouts = getPrivateField("_spouts");
+ this.bolts = getPrivateField("_bolts");
+
+ this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // Kick off the translation immediately
+ translateTopology();
}
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster},
{@link FlinkSubmitter}, or {@link
- * FlinkClient}.
*
- * @throws UnsupportedOperationException
- * at every invocation
+ * Creates a Flink program that uses the specified spouts and bolts.
+ * @param stormBuilder The storm topology builder to use for creating
the Flink topology.
+ * @return A Flink Topology which may be executed.
*/
- @Override
- public JobExecutionResult execute() throws Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly.
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
+ public static FlinkTopology createTopology(TopologyBuilder
stormBuilder) {
+ return new FlinkTopology(stormBuilder);
}
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster},
{@link FlinkSubmitter} or {@link
- * FlinkClient}.
- *
- * @throws UnsupportedOperationException
- * at every invocation
+ * Returns the underlying Flink ExecutionEnvironment for the Storm
topology.
+ * @return The contextual environment.
*/
- @Override
- public JobExecutionResult execute(final String jobName) throws
Exception {
- throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly.
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
- "instead.");
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return this.env;
}
/**
- * Increased the number of declared tasks of this program by the given
value.
- *
- * @param dop
- * The dop of a new operator that increases the number of
overall tasks.
+ * Directly executes the Storm topology based on the current context
(local when in IDE and
+ * remote when executed thorugh ./bin/flink).
+ * @return The execution result
+ * @throws Exception
*/
- public void increaseNumberOfTasks(final int dop) {
- assert (dop > 0);
- this.numberOfTasks += dop;
+ public JobExecutionResult execute() throws Exception {
+ return env.execute();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private <T> Map<String, T> getPrivateField(String field) {
+ try {
+ Field f = builder.getClass().getDeclaredField(field);
+ f.setAccessible(true);
+ return copyObject((Map<String, T>) f.get(builder));
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Couldn't get " + field + "
from TopologyBuilder", e);
+ }
+ }
+
+ private <T> T copyObject(T object) {
+ try {
+ return InstantiationUtil.deserializeObject(
+
InstantiationUtil.serializeObject(object),
+ getClass().getClassLoader()
+ );
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Failed to copy object.");
+ }
}
/**
- * Return the number or required tasks to execute this program.
- *
- * @return the number or required tasks to execute this program
+ * Creates a Flink program that uses the specified spouts and bolts.
*/
- public int getNumberOfTasks() {
- return this.numberOfTasks;
+ private void translateTopology() {
+
+ unprocessdInputsPerBolt.clear();
+ outputStreams.clear();
+ declarers.clear();
+ availableInputs.clear();
+
+ // Storm defaults to parallelism 1
+ env.setParallelism(1);
+
+ /* Translation of topology */
+
+
+ for (final Entry<String, IRichSpout> spout : spouts.entrySet())
{
+ final String spoutId = spout.getKey();
+ final IRichSpout userSpout = spout.getValue();
+
+ final FlinkOutputFieldsDeclarer declarer = new
FlinkOutputFieldsDeclarer();
+ userSpout.declareOutputFields(declarer);
+ final HashMap<String,Fields> sourceStreams =
declarer.outputStreams;
+ this.outputStreams.put(spoutId, sourceStreams);
+ declarers.put(spoutId, declarer);
+
+
+ final HashMap<String, DataStream<Tuple>> outputStreams
= new HashMap<String, DataStream<Tuple>>();
+ final DataStreamSource<?> source;
+
+ if (sourceStreams.size() == 1) {
+ final SpoutWrapper<Tuple>
spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
+
spoutWrapperSingleOutput.setStormTopology(stormTopology);
+
+ final String outputStreamId = (String)
sourceStreams.keySet().toArray()[0];
+
+ DataStreamSource<Tuple> src =
env.addSource(spoutWrapperSingleOutput, spoutId,
+
declarer.getOutputType(outputStreamId));
+
+ outputStreams.put(outputStreamId, src);
+ source = src;
+ } else {
+ final SpoutWrapper<SplitStreamType<Tuple>>
spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
+ userSpout);
+
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ DataStreamSource<SplitStreamType<Tuple>>
multiSource = env.addSource(
+ spoutWrapperMultipleOutputs,
spoutId,
+ (TypeInformation)
TypeExtractor.getForClass(SplitStreamType.class));
+
+ SplitStream<SplitStreamType<Tuple>> splitSource
= multiSource
+ .split(new
StormStreamSelector<Tuple>());
+ for (String streamId : sourceStreams.keySet()) {
+ outputStreams.put(streamId,
splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
+ }
+ source = multiSource;
+ }
+ availableInputs.put(spoutId, outputStreams);
+
+ final ComponentCommon common =
stormTopology.get_spouts().get(spoutId).get_common();
+ if (common.is_set_parallelism_hint()) {
+ int dop = common.get_parallelism_hint();
+ source.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
+ }
+ }
+
+ /**
+ * 1. Connect all spout streams with bolts streams
+ * 2. Then proceed with the bolts stream already connected
+ *
+ * Because we do not know the order in which an iterator steps
over a set, we might process a consumer before
+ * its producer
+ * ->thus, we might need to repeat multiple times
+ */
+ boolean makeProgress = true;
+ while (bolts.size() > 0) {
+ if (!makeProgress) {
+ throw new RuntimeException(
+ "Unable to build Topology.
Could not connect the following bolts: "
+ +
bolts.keySet());
+ }
+ makeProgress = false;
+
+ final Iterator<Entry<String, IRichBolt>> boltsIterator
= bolts.entrySet().iterator();
+ while (boltsIterator.hasNext()) {
+
+ final Entry<String, IRichBolt> bolt =
boltsIterator.next();
+ final String boltId = bolt.getKey();
+ final IRichBolt userBolt =
copyObject(bolt.getValue());
+
+ final ComponentCommon common =
stormTopology.get_bolts().get(boltId).get_common();
+
+ Set<Entry<GlobalStreamId, Grouping>>
unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
+ if (unprocessedBoltInputs == null) {
+ unprocessedBoltInputs = new HashSet<>();
+
unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
+ unprocessdInputsPerBolt.put(boltId,
unprocessedBoltInputs);
+ }
+
+ // check if all inputs are available
+ final int numberOfInputs =
unprocessedBoltInputs.size();
+ int inputsAvailable = 0;
+ for (Entry<GlobalStreamId, Grouping> entry :
unprocessedBoltInputs) {
+ final String producerId =
entry.getKey().get_componentId();
+ final String streamId =
entry.getKey().get_streamId();
+ final HashMap<String,
DataStream<Tuple>> streams = availableInputs.get(producerId);
+ if (streams != null &&
streams.get(streamId) != null) {
+ inputsAvailable++;
+ }
+ }
+
+ if (inputsAvailable != numberOfInputs) {
+ // traverse other bolts first until
inputs are available
+ continue;
+ } else {
+ makeProgress = true;
+ boltsIterator.remove();
+ }
+
+ final Map<GlobalStreamId, DataStream<Tuple>>
inputStreams = new HashMap<>(numberOfInputs);
+
+ for (Entry<GlobalStreamId, Grouping> input :
unprocessedBoltInputs) {
+ final GlobalStreamId streamId =
input.getKey();
+ final Grouping grouping =
input.getValue();
+
+ final String producerId =
streamId.get_componentId();
+
+ final Map<String, DataStream<Tuple>>
producer = availableInputs.get(producerId);
+
+ inputStreams.put(streamId,
processInput(boltId, userBolt, streamId, grouping, producer));
+ }
+
+ final Iterator<Entry<GlobalStreamId,
DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
+
+ final Entry<GlobalStreamId, DataStream<Tuple>>
firstInput = iterator.next();
+ GlobalStreamId streamId = firstInput.getKey();
+ DataStream<Tuple> inputStream =
firstInput.getValue();
+
+ final SingleOutputStreamOperator<?, ?>
outputStream;
+
+ switch (numberOfInputs) {
+ case 1:
+ outputStream =
createOutput(boltId, userBolt, streamId, inputStream);
+ break;
+ case 2:
+ Entry<GlobalStreamId,
DataStream<Tuple>> secondInput = iterator.next();
+ GlobalStreamId streamId2 =
secondInput.getKey();
+ DataStream<Tuple> inputStream2
= secondInput.getValue();
+ outputStream =
createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2);
+ break;
+ default:
+ throw new
UnsupportedOperationException("Don't know how to translate a bolt "
+ + boltId + "
with " + numberOfInputs + " inputs.");
+ }
+
+ if (common.is_set_parallelism_hint()) {
+ int dop = common.get_parallelism_hint();
+ outputStream.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
+ }
+
+ }
+ }
}
+ private DataStream<Tuple> processInput(String boltId, IRichBolt
userBolt,
+
GlobalStreamId streamId, Grouping grouping,
+
Map<String, DataStream<Tuple>> producer) {
+
+ Preconditions.checkNotNull(userBolt);
+ Preconditions.checkNotNull(boltId);
+ Preconditions.checkNotNull(streamId);
+ Preconditions.checkNotNull(grouping);
+ Preconditions.checkNotNull(producer);
+
+ final String producerId = streamId.get_componentId();
+ final String inputStreamId = streamId.get_streamId();
+
+ DataStream<Tuple> inputStream = producer.get(inputStreamId);
+
+ final FlinkOutputFieldsDeclarer declarer = new
FlinkOutputFieldsDeclarer();
+ declarers.put(boltId, declarer);
+ userBolt.declareOutputFields(declarer);
+ this.outputStreams.put(boltId, declarer.outputStreams);
+
+ // if producer was processed already
+ if (grouping.is_set_shuffle()) {
+ // Storm uses a round-robin shuffle strategy
+ inputStream = inputStream.rebalance();
+ } else if (grouping.is_set_fields()) {
+ // global grouping is emulated in Storm via an empty
fields grouping list
+ final List<String> fields = grouping.get_fields();
+ if (fields.size() > 0) {
+ FlinkOutputFieldsDeclarer prodDeclarer =
this.declarers.get(producerId);
+ inputStream = inputStream.keyBy(prodDeclarer
+
.getGroupingFieldIndexes(inputStreamId,
+
grouping.get_fields()));
+ } else {
+ inputStream = inputStream.global();
+ }
+ } else if (grouping.is_set_all()) {
+ inputStream = inputStream.broadcast();
+ } else if (!grouping.is_set_local_or_shuffle()) {
+ throw new UnsupportedOperationException(
+ "Flink only supports
(local-or-)shuffle, fields, all, and global grouping");
+ }
+
+ return inputStream;
+ }
+
+ private SingleOutputStreamOperator<?, ?> createOutput(String boltId,
IRichBolt bolt, GlobalStreamId streamId, DataStream<Tuple> inputStream) {
+ return createOutput(boltId, bolt, streamId, inputStream, null,
null);
+ }
+
+ private SingleOutputStreamOperator<?, ?> createOutput(String boltId,
IRichBolt bolt,
+
GlobalStreamId streamId, DataStream<Tuple>
inputStream,
+
GlobalStreamId streamId2, DataStream<Tuple>
inputStream2) {
+ Preconditions.checkNotNull(boltId);
+ Preconditions.checkNotNull(streamId);
--- End diff --
`assert`
> FlinkTopologyBuilder cannot handle multiple input streams
> ---------------------------------------------------------
>
> Key: FLINK-2837
> URL: https://issues.apache.org/jira/browse/FLINK-2837
> Project: Flink
> Issue Type: Bug
> Components: Storm Compatibility
> Reporter: Matthias J. Sax
> Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead
> of union the incoming streams, it replicates the consuming bolt and each
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
> .shuffleGrouping(spoutId1)
> .shuffleGrouping(spoutId2)
> .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
> .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)