Repository: flink
Updated Branches:
refs/heads/master 095dc4a54 -> 7ce9a8ff9
[FLINK-1381] Allow multiple output splitters for single stream operator
Closes #332
Conflicts:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ce9a8ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ce9a8ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ce9a8ff
Branch: refs/heads/master
Commit: 7ce9a8ff90d1ec7fff8d65c24c58e11a0aa6f445
Parents: 095dc4a
Author: mingliang <[email protected]>
Authored: Fri Jan 23 10:59:02 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sun Jan 25 15:24:40 2015 +0100
----------------------------------------------------------------------
.../flink/streaming/api/StreamConfig.java | 17 +-
.../apache/flink/streaming/api/StreamGraph.java | 13 +-
.../api/StreamingJobGraphGenerator.java | 2 +-
.../api/collector/DirectedCollectorWrapper.java | 43 +++--
.../streaming/api/datastream/DataStream.java | 30 ++-
.../datastream/SingleOutputStreamOperator.java | 23 ---
.../api/datastream/SplitDataStream.java | 5 +-
.../api/streamvertex/OutputHandler.java | 2 +-
.../flink/streaming/api/OutputSplitterTest.java | 185 +++++++++++++++++++
.../flink/streaming/api/scala/DataStream.scala | 7 +-
10 files changed, 257 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 94349d7..1d51216 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -186,17 +186,22 @@ public class StreamConfig implements Serializable {
return config.getBoolean(DIRECTED_EMIT, false);
}
- public void setOutputSelector(OutputSelector<?> outputSelector) {
- if (outputSelector != null) {
- setDirectedEmit(true);
- config.setBytes(OUTPUT_SELECTOR,
SerializationUtils.serialize(outputSelector));
+
+ public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+ try {
+ if (outputSelector != null) {
+ setDirectedEmit(true);
+ config.setBytes(OUTPUT_SELECTOR,
SerializationUtils.serialize((Serializable) outputSelector));
+ }
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize
OutputSelector");
}
}
@SuppressWarnings("unchecked")
- public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
+ public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
try {
- return (OutputSelector<T>)
InstantiationUtil.readObjectFromConfig(this.config,
+ return (List<OutputSelector<T>>)
InstantiationUtil.readObjectFromConfig(this.config,
OUTPUT_SELECTOR, cl);
} catch (Exception e) {
throw new StreamVertexException("Cannot deserialize and
instantiate OutputSelector", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index c9ecd55..2a0d0c7 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -66,8 +66,8 @@ public class StreamGraph {
private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
- private Map<String, OutputSelector<?>> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>>
jobVertexClasses;
+ private Map<String, List<OutputSelector<?>>> outputSelectors;
private Map<String, Integer> iterationIds;
private Map<Integer, String> iterationIDtoHeadName;
private Map<Integer, String> iterationIDtoTailName;
@@ -101,7 +101,7 @@ public class StreamGraph {
typeSerializersIn2 = new HashMap<String,
StreamRecordSerializer<?>>();
typeSerializersOut1 = new HashMap<String,
StreamRecordSerializer<?>>();
typeSerializersOut2 = new HashMap<String,
StreamRecordSerializer<?>>();
- outputSelectors = new HashMap<String, OutputSelector<?>>();
+ outputSelectors = new HashMap<String,
List<OutputSelector<?>>>();
jobVertexClasses = new HashMap<String, Class<? extends
AbstractInvokable>>();
iterationIds = new HashMap<String, Integer>();
iterationIDtoHeadName = new HashMap<Integer, String>();
@@ -272,6 +272,7 @@ public class StreamGraph {
outEdgeLists.put(vertexName, new ArrayList<String>());
outEdgeTypes.put(vertexName, new ArrayList<Integer>());
selectedNames.put(vertexName, new ArrayList<List<String>>());
+ outputSelectors.put(vertexName, new
ArrayList<OutputSelector<?>>());
inEdgeLists.put(vertexName, new ArrayList<String>());
outputPartitioners.put(vertexName, new
ArrayList<StreamPartitioner<?>>());
iterationTailCount.put(vertexName, 0);
@@ -385,10 +386,10 @@ public class StreamGraph {
* @param vertexName
* Name of the vertex for which the output selector will be
set
* @param outputSelector
- * The outputselector object
+ * The user defined output selector.
*/
- public void setOutputSelector(String vertexName, OutputSelector<?>
outputSelector) {
- outputSelectors.put(vertexName, outputSelector);
+ public <T> void setOutputSelector(String vertexName, OutputSelector<T>
outputSelector) {
+ outputSelectors.get(vertexName).add(outputSelector);
if (LOG.isDebugEnabled()) {
LOG.debug("Outputselector set for {}", vertexName);
@@ -520,7 +521,7 @@ public class StreamGraph {
return inputFormatLists.get(vertexName);
}
- public OutputSelector<?> getOutputSelector(String vertexName) {
+ public List<OutputSelector<?>> getOutputSelector(String vertexName) {
return outputSelectors.get(vertexName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 3b2d135..fccb1e1 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -191,7 +191,7 @@ public class StreamingJobGraphGenerator {
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
config.setUserInvokable(streamGraph.getInvokable(vertexName));
-
config.setOutputSelector(streamGraph.getOutputSelector(vertexName));
+
config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
config.setOperatorStates(streamGraph.getState(vertexName));
config.setNumberOfOutputs(nonChainableOutputs.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
index 66fb667..4681cd3 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
@@ -40,7 +40,7 @@ public class DirectedCollectorWrapper<OUT> extends
CollectorWrapper<OUT> {
private static final Logger LOG =
LoggerFactory.getLogger(DirectedCollectorWrapper.class);
- OutputSelector<OUT> outputSelector;
+ List<OutputSelector<OUT>> outputSelectors;
protected Map<String, List<Collector<OUT>>> outputMap;
@@ -53,8 +53,8 @@ public class DirectedCollectorWrapper<OUT> extends
CollectorWrapper<OUT> {
* @param outputSelector
* User defined {@link OutputSelector}
*/
- public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) {
- this.outputSelector = outputSelector;
+ public DirectedCollectorWrapper(List<OutputSelector<OUT>>
outputSelectors) {
+ this.outputSelectors = outputSelectors;
this.emitted = new HashSet<Collector<OUT>>();
this.selectAllOutputs = new LinkedList<Collector<OUT>>();
this.outputMap = new HashMap<String, List<Collector<OUT>>>();
@@ -91,34 +91,37 @@ public class DirectedCollectorWrapper<OUT> extends
CollectorWrapper<OUT> {
public void collect(OUT record) {
emitted.clear();
- Iterable<String> outputNames = outputSelector.select(record);
-
for (Collector<OUT> output : selectAllOutputs) {
output.collect(record);
emitted.add(output);
}
- for (String outputName : outputNames) {
- List<Collector<OUT>> outputList =
outputMap.get(outputName);
- if (outputList == null) {
- if (LOG.isErrorEnabled()) {
- String format = String.format(
- "Cannot emit because no
output is selected with the name: %s",
- outputName);
- LOG.error(format);
+ for (OutputSelector<OUT> outputSelector : outputSelectors) {
+ Iterable<String> outputNames =
outputSelector.select(record);
- }
- } else {
- for (Collector<OUT> output : outputList) {
- if (!emitted.contains(output)) {
- output.collect(record);
- emitted.add(output);
+ for (String outputName : outputNames) {
+ List<Collector<OUT>> outputList =
outputMap.get(outputName);
+ if (outputList == null) {
+ if (LOG.isErrorEnabled()) {
+ String format = String.format(
+ "Cannot emit
because no output is selected with the name: %s",
+ outputName);
+ LOG.error(format);
+
+ }
+ } else {
+ for (Collector<OUT> output :
outputList) {
+ if (!emitted.contains(output)) {
+ output.collect(record);
+ emitted.add(output);
+ }
}
+
}
}
-
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0f633ec..b30d261 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.StreamGraph;
+import org.apache.flink.streaming.api.collector.OutputSelector;
import
org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
import
org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -225,6 +226,25 @@ public class DataStream<OUT> {
}
/**
+ * Operator used for directing tuples to specific named outputs using an
+ * {@link org.apache.flink.streaming.api.collector.OutputSelector}.
Calling
+ * this method on an operator creates a new {@link SplitDataStream}.
+ *
+ * @param outputSelector
+ * The user defined
+ * {@link
org.apache.flink.streaming.api.collector.OutputSelector}
+ * for directing the tuples.
+ * @return The {@link SplitDataStream}
+ */
+ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+ for (DataStream<OUT> ds : this.mergedStreams) {
+ streamGraph.setOutputSelector(ds.getId(),
clean(outputSelector));
+ }
+
+ return new SplitDataStream<OUT>(this);
+ }
+
+ /**
* Creates a new {@link ConnectedDataStream} by connecting
* {@link DataStream} outputs of different type with each other. The
* DataStreams connected using this operators can be used with
CoFunctions.
@@ -382,8 +402,7 @@ public class DataStream<OUT> {
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is
to use
* output splitting to send a part of the closing data stream to the
head.
- * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
- * more information.
+ * Refer to {@link #split(OutputSelector)} for more information.
* <p>
* The iteration edge will be partitioned the same way as the first
input of
* the iteration head.
@@ -408,8 +427,7 @@ public class DataStream<OUT> {
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is
to use
* output splitting to send a part of the closing data stream to the
head.
- * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
- * more information.
+ * Refer to {@link #split(OutputSelector)} for more information.
* <p>
* The iteration edge will be partitioned the same way as the first
input of
* the iteration head.
@@ -1176,8 +1194,8 @@ public class DataStream<OUT> {
DataStreamSink<OUT> returnStream = new
DataStreamSink<OUT>(environment, "sink", getType(),
sinkInvokable);
- streamGraph.addStreamVertex(returnStream.getId(),
sinkInvokable, getType(), null,
- "sink", degreeOfParallelism);
+ streamGraph.addStreamVertex(returnStream.getId(),
sinkInvokable, getType(), null, "sink",
+ degreeOfParallelism);
this.connectGraph(this.copy(), returnStream.getId(), 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index dbfbc48..dcfd6fe 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,7 +22,6 @@ import java.util.Map.Entry;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
@@ -101,28 +100,6 @@ public class SingleOutputStreamOperator<OUT, O extends
SingleOutputStreamOperato
}
/**
- * Operator used for directing tuples to specific named outputs using an
- * {@link OutputSelector}. Calling this method on an operator creates a
new
- * {@link SplitDataStream}.
- *
- * @param outputSelector
- * The user defined {@link OutputSelector} for directing the
- * tuples.
- * @return The {@link SplitDataStream}
- */
- public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
- if (!isSplit) {
- this.isSplit = true;
- streamGraph.setOutputSelector(id,
clean(outputSelector));
-
- return new SplitDataStream<OUT>(this);
- } else {
- throw new RuntimeException("Currently operators can
only be split once");
- }
-
- }
-
- /**
* This is a beta feature </br></br> Register an operator state for this
* operator by the given name. This name can be used to retrieve the
state
* during runtime using {@link
StreamingRuntimeContext#getState(String)}. To
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 2b5b7c5..97458a8 100755
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -56,7 +56,10 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
}
DataStream<OUT> returnStream = copy();
- returnStream.userDefinedNames = Arrays.asList(outputNames);
+
+ for (DataStream<OUT> ds : returnStream.mergedStreams) {
+ ds.userDefinedNames = Arrays.asList(outputNames);
+ }
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 0d60939..1a12cb2 100644
---
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -108,7 +108,7 @@ public class OutputHandler<OUT> {
// We create a wrapper that will encapsulate the chained
operators and
// network outputs
CollectorWrapper<OUT> wrapper = isDirectEmit ? new
DirectedCollectorWrapper(
- chainedTaskConfig.getOutputSelector(cl)) : new
CollectorWrapper<OUT>();
+ chainedTaskConfig.getOutputSelectors(cl)) : new
CollectorWrapper<OUT>();
// Create collectors for the network outputs
for (String output : chainedTaskConfig.getOutputs(cl)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
new file mode 100644
index 0000000..2486715
--- /dev/null
+++
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class OutputSplitterTest {
+
+ private static final long MEMORYSIZE = 32;
+
+ private static ArrayList<Integer> splitterResult1 = new
ArrayList<Integer>();
+ private static ArrayList<Integer> splitterResult2 = new
ArrayList<Integer>();
+
+
+ private static ArrayList<Integer> expectedSplitterResult = new
ArrayList<Integer>();
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOnMergedDataStream() throws Exception {
+ splitterResult1.clear();
+ splitterResult2.clear();
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(1);
+
+ DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
+ DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+
+ d1 = d1.merge(d2);
+
+ d1.split(new OutputSelector<Integer>() {
+ private static final long serialVersionUID =
8354166915727490130L;
+
+ @Override
+ public Iterable<String> select(Integer value) {
+ List<String> s = new ArrayList<String>();
+ if (value > 4) {
+ s.add(">");
+ } else {
+ s.add("<");
+ }
+ return s;
+ }
+ }).select(">").addSink(new SinkFunction<Integer>() {
+
+ private static final long serialVersionUID =
5827187510526388104L;
+
+ @Override
+ public void invoke(Integer value) {
+ splitterResult1.add(value);
+ }
+ });
+
+ d1.split(new OutputSelector<Integer>() {
+ private static final long serialVersionUID =
-6822487543355994807L;
+
+ @Override
+ public Iterable<String> select(Integer value) {
+ List<String> s = new ArrayList<String>();
+ if (value % 3 == 0) {
+ s.add("yes");
+ } else {
+ s.add("no");
+ }
+ return s;
+ }
+ }).select("yes").addSink(new SinkFunction<Integer>() {
+ private static final long serialVersionUID =
-2674335071267854599L;
+
+ @Override
+ public void invoke(Integer value) {
+ splitterResult2.add(value);
+ }
+ });
+ env.execute();
+
+ Collections.sort(splitterResult1);
+ Collections.sort(splitterResult2);
+
+ expectedSplitterResult.clear();
+ expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
+ assertEquals(expectedSplitterResult, splitterResult1);
+
+ expectedSplitterResult.clear();
+ expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
+ assertEquals(expectedSplitterResult, splitterResult2);
+ }
+
+ @Test
+ public void testOnSingleDataStream() throws Exception {
+ splitterResult1.clear();
+ splitterResult2.clear();
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(1);
+
+ DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+
+ ds.split(new OutputSelector<Integer>() {
+ private static final long serialVersionUID =
2524335410904414121L;
+
+ @Override
+ public Iterable<String> select(Integer value) {
+ List<String> s = new ArrayList<String>();
+ if (value % 2 == 0) {
+ s.add("even");
+ } else {
+ s.add("odd");
+ }
+ return s;
+ }
+ }).select("even").addSink(new SinkFunction<Integer>() {
+
+ private static final long serialVersionUID =
-2995092337537209535L;
+
+ @Override
+ public void invoke(Integer value) {
+ splitterResult1.add(value);
+ }
+ });
+
+ ds.split(new OutputSelector<Integer>() {
+
+ private static final long serialVersionUID =
-511693919586034092L;
+
+ @Override
+ public Iterable<String> select(Integer value) {
+ List<String> s = new ArrayList<String>();
+ if (value % 4 == 0) {
+ s.add("yes");
+ } else {
+ s.add("no");
+ }
+ return s;
+ }
+ }).select("yes").addSink(new SinkFunction<Integer>() {
+
+ private static final long serialVersionUID =
-1749077049727705424L;
+
+ @Override
+ public void invoke(Integer value) {
+ splitterResult2.add(value);
+ }
+ });
+ env.execute();
+
+ Collections.sort(splitterResult1);
+ Collections.sort(splitterResult2);
+
+ expectedSplitterResult.clear();
+ expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
+ assertEquals(expectedSplitterResult, splitterResult1);
+
+ expectedSplitterResult.clear();
+ expectedSplitterResult.addAll(Arrays.asList(0,4,8));
+ assertEquals(expectedSplitterResult, splitterResult2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 698b193..177a9ee 100644
---
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -468,12 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* OutputSelector. Calling this method on an operator creates a new
* SplitDataStream.
*/
- def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream
match {
- case op: SingleOutputStreamOperator[_, _] => op.split(selector)
- case _ =>
- throw new UnsupportedOperationException("Operator " +
javaStream.toString + " can not be " +
- "split.")
- }
+ def split(selector: OutputSelector[T]): SplitDataStream[T] =
javaStream.split(selector)
/**
* Creates a new SplitDataStream that contains only the elements satisfying
the