Repository: flink Updated Branches: refs/heads/master 4a3151681 -> fa88d9eb1
[FLINK-2862] [Storm Compatibility] FlinkTopologyBuilder should use proper generic types This closes #1274 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa88d9eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa88d9eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa88d9eb Branch: refs/heads/master Commit: fa88d9eb1b88641d6ca03b38e2058d30971c7a2f Parents: 4a31516 Author: mjsax <[email protected]> Authored: Tue Oct 20 18:06:00 2015 +0200 Committer: mjsax <[email protected]> Committed: Sat Oct 24 13:04:10 2015 +0200 ---------------------------------------------------------------------- docs/apis/storm_compatibility.md | 8 +- .../storm/api/FlinkOutputFieldsDeclarer.java | 6 +- .../flink/storm/api/FlinkTopologyBuilder.java | 107 +++++++++++-------- .../storm/util/SplitStreamTypeKeySelector.java | 47 -------- .../flink/storm/wrappers/BoltWrapper.java | 11 +- .../api/FlinkOutputFieldsDeclarerTest.java | 12 +-- 6 files changed, 71 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/docs/apis/storm_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md index bf80d4e..e3e11ab 100644 --- a/docs/apis/storm_compatibility.md +++ b/docs/apis/storm_compatibility.md @@ -209,7 +209,6 @@ If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., ther For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitStream.select(...)`. Flink provides the predefined output selector `StormStreamSelector<T>` for `.split(...)` already. Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`. -If a data stream of type `SplitStreamTuple<T>` is used as input for a Bolt, it is **not** required to strip the wrapper – `BoltWrapper` removes it automatically. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -221,10 +220,9 @@ DataStream<SplitStreamType<SomeType>> multiStream = ... SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new StormStreamSelector<SomeType>()); -// remove SplitStreamMapper to get data stream of type SomeType -DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>).returns(SomeType.classs); -// apply Bolt directly, without stripping SplitStreamType -DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt for further processing */); +// remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType +DataStream<SomeType> s1 = splitStream.select("s1").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs); +DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<SomeType>()).returns(SomeType.classs); // do further processing on s1 and s2 [...] http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java index 88d2dfe..febd56d 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java @@ -91,7 +91,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { * @throws IllegalArgumentException * If no output schema was declared for the specified stream or if more then 25 attributes got declared. */ - TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException { + TypeInformation<Tuple> getOutputType(final String streamId) throws IllegalArgumentException { if (streamId == null) { return null; } @@ -105,9 +105,7 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { Tuple t; final int numberOfAttributes = outputSchema.size(); - if (numberOfAttributes == 1) { - return TypeExtractor.getForClass(Object.class); - } else if (numberOfAttributes <= 25) { + if (numberOfAttributes <= 25) { try { t = Tuple.getTupleClass(numberOfAttributes).newInstance(); } catch (final InstantiationException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java index 9c41d88..8a88eac 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java @@ -33,9 +33,10 @@ import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.storm.util.SplitStreamMapper; import org.apache.flink.storm.util.SplitStreamType; -import org.apache.flink.storm.util.SplitStreamTypeKeySelector; import org.apache.flink.storm.util.StormStreamSelector; import org.apache.flink.storm.wrappers.BoltWrapper; import org.apache.flink.storm.wrappers.SpoutWrapper; @@ -77,14 +78,13 @@ public class FlinkTopologyBuilder { /** * Creates a Flink program that uses the specified spouts and bolts. */ - @SuppressWarnings({"rawtypes", "unchecked"}) public FlinkTopology createTopology() { this.stormTopology = this.stormBuilder.createTopology(); final FlinkTopology env = new FlinkTopology(); env.setParallelism(1); - final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>(); + final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>(); for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) { final String spoutId = spout.getKey(); @@ -96,24 +96,37 @@ public class FlinkTopologyBuilder { this.outputStreams.put(spoutId, sourceStreams); declarers.put(spoutId, declarer); - final SpoutWrapper spoutWrapper = new SpoutWrapper(userSpout); - spoutWrapper.setStormTopology(stormTopology); - DataStreamSource source; - final HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>(); + final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); + final DataStreamSource<?> source; + if (sourceStreams.size() == 1) { + final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout); + spoutWrapperSingleOutput.setStormTopology(stormTopology); + final String outputStreamId = (String) sourceStreams.keySet().toArray()[0]; - source = env.addSource(spoutWrapper, spoutId, + + DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId, declarer.getOutputType(outputStreamId)); - outputStreams.put(outputStreamId, source); + + outputStreams.put(outputStreamId, src); + source = src; } else { - source = env.addSource(spoutWrapper, spoutId, - TypeExtractor.getForClass(SplitStreamType.class)); - SplitStream splitSource = source.split(new StormStreamSelector()); + final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>( + userSpout); + spoutWrapperMultipleOutputs.setStormTopology(stormTopology); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource( + spoutWrapperMultipleOutputs, spoutId, + (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class)); + SplitStream<SplitStreamType<Tuple>> splitSource = multiSource + .split(new StormStreamSelector<Tuple>()); for (String streamId : sourceStreams.keySet()) { - outputStreams.put(streamId, splitSource.select(streamId)); + outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper<Tuple>())); } + source = multiSource; } availableInputs.put(spoutId, outputStreams); @@ -171,11 +184,11 @@ public class FlinkTopologyBuilder { final String producerId = stormInputStream.getKey().get_componentId(); final String inputStreamId = stormInputStream.getKey().get_streamId(); - final HashMap<String, DataStream> producer = availableInputs.get(producerId); + final HashMap<String, DataStream<Tuple>> producer = availableInputs.get(producerId); if (producer != null) { makeProgress = true; - DataStream inputStream = producer.get(inputStreamId); + DataStream<Tuple> inputStream = producer.get(inputStreamId); if (inputStream != null) { final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); userBolt.declareOutputFields(declarer); @@ -193,18 +206,9 @@ public class FlinkTopologyBuilder { final List<String> fields = grouping.get_fields(); if (fields.size() > 0) { FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId); - if (producer.size() == 1) { - inputStream = inputStream.keyBy(prodDeclarer - .getGroupingFieldIndexes(inputStreamId, - grouping.get_fields())); - } else { - inputStream = inputStream - .keyBy(new SplitStreamTypeKeySelector( - inputStream.getType(), - prodDeclarer.getGroupingFieldIndexes( - inputStreamId, - grouping.get_fields()))); - } + inputStream = inputStream.keyBy(prodDeclarer + .getGroupingFieldIndexes(inputStreamId, + grouping.get_fields())); } else { inputStream = inputStream.global(); } @@ -215,43 +219,56 @@ public class FlinkTopologyBuilder { "Flink only supports (local-or-)shuffle, fields, all, and global grouping"); } - final SingleOutputStreamOperator outputStream; - final BoltWrapper boltWrapper; + final SingleOutputStreamOperator<?, ?> outputStream; + if (boltOutputStreams.size() < 2) { // single output stream or sink String outputStreamId = null; if (boltOutputStreams.size() == 1) { outputStreamId = (String) boltOutputStreams.keySet().toArray()[0]; } - final TypeInformation<?> outType = declarer + final TypeInformation<Tuple> outType = declarer .getOutputType(outputStreamId); - boltWrapper = new BoltWrapper(userBolt, this.outputStreams - .get(producerId).get(inputStreamId)); - outputStream = inputStream.transform(boltId, outType, boltWrapper); + final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>( + userBolt, this.outputStreams.get(producerId).get( + inputStreamId)); + boltWrapperSingleOutput.setStormTopology(stormTopology); + + final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream + .transform(boltId, outType, boltWrapperSingleOutput); if (outType != null) { // only for non-sink nodes - final HashMap<String, DataStream> op = new HashMap<String, DataStream>(); - op.put(outputStreamId, outputStream); + final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>(); + op.put(outputStreamId, outStream); availableInputs.put(boltId, op); } + outputStream = outStream; } else { - final TypeInformation<?> outType = TypeExtractor + final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>( + userBolt, this.outputStreams.get(producerId).get( + inputStreamId)); + boltWrapperMultipleOutputs.setStormTopology(stormTopology); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + final TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) TypeExtractor .getForClass(SplitStreamType.class); - boltWrapper = new BoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId)); - outputStream = inputStream.transform(boltId, outType, boltWrapper); + final SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream + .transform(boltId, outType, boltWrapperMultipleOutputs); - final SplitStream splitStreams = outputStream - .split(new StormStreamSelector()); + final SplitStream<SplitStreamType<Tuple>> splitStream = multiStream + .split(new StormStreamSelector<Tuple>()); - final HashMap<String, DataStream> op = new HashMap<String, DataStream>(); + final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>(); for (String outputStreamId : boltOutputStreams.keySet()) { - op.put(outputStreamId, splitStreams.select(outputStreamId)); + op.put(outputStreamId, + splitStream.select(outputStreamId).map( + new SplitStreamMapper<Tuple>())); } availableInputs.put(boltId, op); + outputStream = multiStream; } - boltWrapper.setStormTopology(stormTopology); int dop = 1; if (common.is_set_parallelism_hint()) { @@ -342,7 +359,7 @@ public class FlinkTopologyBuilder { * the basic bolt * @param parallelism_hint * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a - * process somwehere around the cluster. + * process somewhere around the cluster. * @return use the returned object to declare the inputs to this component */ public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) { @@ -371,7 +388,7 @@ public class FlinkTopologyBuilder { * outputs. * @param parallelism_hint * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a - * process somwehere around the cluster. + * process somewhere around the cluster. * @param spout * the spout */ http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java deleted file mode 100644 index 71e5b86..0000000 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.storm.util; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.streaming.util.keys.KeySelectorUtil; -import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector; - -/** - * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via - * {@link StormStreamSelector} from a Spout or Bolt that declares multiple output streams. - * - * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular - * {@link ArrayKeySelector} on it. - */ -public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> { - private static final long serialVersionUID = 4672434660037669254L; - - private final ArrayKeySelector<Tuple> selector; - - public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... fields) { - this.selector = KeySelectorUtil.getSelectorForArray(fields, type); - } - - @Override - public Tuple getKey(SplitStreamType<Tuple> value) throws Exception { - return selector.getKey(value.value); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index f0913e8..12d967a 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -61,7 +60,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements private final Fields inputSchema; /** The original Storm topology. */ protected StormTopology stormTopology; - + /** * We have to use this because Operators must output * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}. @@ -239,17 +238,11 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements this.bolt.cleanup(); } - @SuppressWarnings("unchecked") @Override public void processElement(final StreamRecord<IN> element) throws Exception { this.flinkCollector.setTimestamp(element.getTimestamp()); IN value = element.getValue(); - if (value instanceof SplitStreamType) { - this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value, - inputSchema)); - } else { - this.bolt.execute(new StormTuple<IN>(value, inputSchema)); - } + this.bolt.execute(new StormTuple<IN>(value, inputSchema)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fa88d9eb/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java index 49de476..8f0ad3b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java @@ -20,7 +20,6 @@ import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; import org.apache.flink.storm.util.AbstractTest; import org.junit.Assert; @@ -30,8 +29,6 @@ import java.util.LinkedList; public class FlinkOutputFieldsDeclarerTest extends AbstractTest { - - @Test public void testNull() { Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null)); @@ -100,13 +97,8 @@ public class FlinkOutputFieldsDeclarerTest extends AbstractTest { for (String stream : streams) { final TypeInformation<?> type = declarer.getOutputType(stream); - if (numberOfAttributes == 1) { - Assert.assertEquals(type.getClass(), GenericTypeInfo.class); - Assert.assertEquals(type.getTypeClass(), Object.class); - } else { - Assert.assertEquals(numberOfAttributes, type.getArity()); - Assert.assertTrue(type.isTupleType()); - } + Assert.assertEquals(numberOfAttributes, type.getArity()); + Assert.assertTrue(type.isTupleType()); } }
