Repository: flink
Updated Branches:
  refs/heads/master 095dc4a54 -> 7ce9a8ff9


[FLINK-1381] Allow multiple output splitters for single stream operator

Closes #332

Conflicts:
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
        
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ce9a8ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ce9a8ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ce9a8ff

Branch: refs/heads/master
Commit: 7ce9a8ff90d1ec7fff8d65c24c58e11a0aa6f445
Parents: 095dc4a
Author: mingliang <[email protected]>
Authored: Fri Jan 23 10:59:02 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sun Jan 25 15:24:40 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  17 +-
 .../apache/flink/streaming/api/StreamGraph.java |  13 +-
 .../api/StreamingJobGraphGenerator.java         |   2 +-
 .../api/collector/DirectedCollectorWrapper.java |  43 +++--
 .../streaming/api/datastream/DataStream.java    |  30 ++-
 .../datastream/SingleOutputStreamOperator.java  |  23 ---
 .../api/datastream/SplitDataStream.java         |   5 +-
 .../api/streamvertex/OutputHandler.java         |   2 +-
 .../flink/streaming/api/OutputSplitterTest.java | 185 +++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |   7 +-
 10 files changed, 257 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 94349d7..1d51216 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -186,17 +186,22 @@ public class StreamConfig implements Serializable {
                return config.getBoolean(DIRECTED_EMIT, false);
        }
 
-       public void setOutputSelector(OutputSelector<?> outputSelector) {
-               if (outputSelector != null) {
-                       setDirectedEmit(true);
-                       config.setBytes(OUTPUT_SELECTOR, 
SerializationUtils.serialize(outputSelector));
+
+       public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+               try {
+                       if (outputSelector != null) {
+                               setDirectedEmit(true);
+                               config.setBytes(OUTPUT_SELECTOR, 
SerializationUtils.serialize((Serializable) outputSelector));
+                       }
+               } catch (SerializationException e) {
+                       throw new RuntimeException("Cannot serialize 
OutputSelector");
                }
        }
 
        @SuppressWarnings("unchecked")
-       public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
+       public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
                try {
-                       return (OutputSelector<T>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                       return (List<OutputSelector<T>>) 
InstantiationUtil.readObjectFromConfig(this.config,
                                        OUTPUT_SELECTOR, cl);
                } catch (Exception e) {
                        throw new StreamVertexException("Cannot deserialize and 
instantiate OutputSelector", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index c9ecd55..2a0d0c7 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -66,8 +66,8 @@ public class StreamGraph {
        private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
        private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
        private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-       private Map<String, OutputSelector<?>> outputSelectors;
        private Map<String, Class<? extends AbstractInvokable>> 
jobVertexClasses;
+       private Map<String, List<OutputSelector<?>>> outputSelectors;
        private Map<String, Integer> iterationIds;
        private Map<Integer, String> iterationIDtoHeadName;
        private Map<Integer, String> iterationIDtoTailName;
@@ -101,7 +101,7 @@ public class StreamGraph {
                typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
                typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
                typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               outputSelectors = new HashMap<String, OutputSelector<?>>();
+               outputSelectors = new HashMap<String, 
List<OutputSelector<?>>>();
                jobVertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
                iterationIds = new HashMap<String, Integer>();
                iterationIDtoHeadName = new HashMap<Integer, String>();
@@ -272,6 +272,7 @@ public class StreamGraph {
                outEdgeLists.put(vertexName, new ArrayList<String>());
                outEdgeTypes.put(vertexName, new ArrayList<Integer>());
                selectedNames.put(vertexName, new ArrayList<List<String>>());
+               outputSelectors.put(vertexName, new 
ArrayList<OutputSelector<?>>());
                inEdgeLists.put(vertexName, new ArrayList<String>());
                outputPartitioners.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
                iterationTailCount.put(vertexName, 0);
@@ -385,10 +386,10 @@ public class StreamGraph {
         * @param vertexName
         *            Name of the vertex for which the output selector will be 
set
         * @param outputSelector
-        *            The outputselector object
+        *            The user defined output selector.
         */
-       public void setOutputSelector(String vertexName, OutputSelector<?> 
outputSelector) {
-               outputSelectors.put(vertexName, outputSelector);
+       public <T> void setOutputSelector(String vertexName, OutputSelector<T> 
outputSelector) {
+               outputSelectors.get(vertexName).add(outputSelector);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Outputselector set for {}", vertexName);
@@ -520,7 +521,7 @@ public class StreamGraph {
                return inputFormatLists.get(vertexName);
        }
 
-       public OutputSelector<?> getOutputSelector(String vertexName) {
+       public List<OutputSelector<?>> getOutputSelector(String vertexName) {
                return outputSelectors.get(vertexName);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 3b2d135..fccb1e1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -191,7 +191,7 @@ public class StreamingJobGraphGenerator {
                
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
 
                config.setUserInvokable(streamGraph.getInvokable(vertexName));
-               
config.setOutputSelector(streamGraph.getOutputSelector(vertexName));
+               
config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
                config.setOperatorStates(streamGraph.getState(vertexName));
 
                config.setNumberOfOutputs(nonChainableOutputs.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
index 66fb667..4681cd3 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
@@ -40,7 +40,7 @@ public class DirectedCollectorWrapper<OUT> extends 
CollectorWrapper<OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DirectedCollectorWrapper.class);
 
-       OutputSelector<OUT> outputSelector;
+       List<OutputSelector<OUT>> outputSelectors;
 
        protected Map<String, List<Collector<OUT>>> outputMap;
 
@@ -53,8 +53,8 @@ public class DirectedCollectorWrapper<OUT> extends 
CollectorWrapper<OUT> {
         * @param outputSelector
         *            User defined {@link OutputSelector}
         */
-       public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) {
-               this.outputSelector = outputSelector;
+       public DirectedCollectorWrapper(List<OutputSelector<OUT>> 
outputSelectors) {
+               this.outputSelectors = outputSelectors;
                this.emitted = new HashSet<Collector<OUT>>();
                this.selectAllOutputs = new LinkedList<Collector<OUT>>();
                this.outputMap = new HashMap<String, List<Collector<OUT>>>();
@@ -91,34 +91,37 @@ public class DirectedCollectorWrapper<OUT> extends 
CollectorWrapper<OUT> {
        public void collect(OUT record) {
                emitted.clear();
 
-               Iterable<String> outputNames = outputSelector.select(record);
-
                for (Collector<OUT> output : selectAllOutputs) {
                        output.collect(record);
                        emitted.add(output);
                }
 
-               for (String outputName : outputNames) {
-                       List<Collector<OUT>> outputList = 
outputMap.get(outputName);
-                       if (outputList == null) {
-                               if (LOG.isErrorEnabled()) {
-                                       String format = String.format(
-                                                       "Cannot emit because no 
output is selected with the name: %s",
-                                                       outputName);
-                                       LOG.error(format);
+               for (OutputSelector<OUT> outputSelector : outputSelectors) {
+                       Iterable<String> outputNames = 
outputSelector.select(record);
 
-                               }
-                       } else {
-                               for (Collector<OUT> output : outputList) {
-                                       if (!emitted.contains(output)) {
-                                               output.collect(record);
-                                               emitted.add(output);
+                       for (String outputName : outputNames) {
+                               List<Collector<OUT>> outputList = 
outputMap.get(outputName);
+                               if (outputList == null) {
+                                       if (LOG.isErrorEnabled()) {
+                                               String format = String.format(
+                                                               "Cannot emit 
because no output is selected with the name: %s",
+                                                               outputName);
+                                               LOG.error(format);
+
+                                       }
+                               } else {
+                                       for (Collector<OUT> output : 
outputList) {
+                                               if (!emitted.contains(output)) {
+                                                       output.collect(record);
+                                                       emitted.add(output);
+                                               }
                                        }
+
                                }
 
                        }
-
                }
+
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0f633ec..b30d261 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.StreamGraph;
+import org.apache.flink.streaming.api.collector.OutputSelector;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -225,6 +226,25 @@ public class DataStream<OUT> {
        }
 
        /**
+        * Operator used for directing tuples to specific named outputs using an
+        * {@link org.apache.flink.streaming.api.collector.OutputSelector}. 
Calling
+        * this method on an operator creates a new {@link SplitDataStream}.
+        * 
+        * @param outputSelector
+        *            The user defined
+        *            {@link 
org.apache.flink.streaming.api.collector.OutputSelector}
+        *            for directing the tuples.
+        * @return The {@link SplitDataStream}
+        */
+       public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+               for (DataStream<OUT> ds : this.mergedStreams) {
+                       streamGraph.setOutputSelector(ds.getId(), 
clean(outputSelector));
+               }
+
+               return new SplitDataStream<OUT>(this);
+       }
+
+       /**
         * Creates a new {@link ConnectedDataStream} by connecting
         * {@link DataStream} outputs of different type with each other. The
         * DataStreams connected using this operators can be used with 
CoFunctions.
@@ -382,8 +402,7 @@ public class DataStream<OUT> {
         * the data stream that will be fed back and used as the input for the
         * iteration head. A common usage pattern for streaming iterations is 
to use
         * output splitting to send a part of the closing data stream to the 
head.
-        * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
-        * more information.
+        * Refer to {@link #split(OutputSelector)} for more information.
         * <p>
         * The iteration edge will be partitioned the same way as the first 
input of
         * the iteration head.
@@ -408,8 +427,7 @@ public class DataStream<OUT> {
         * the data stream that will be fed back and used as the input for the
         * iteration head. A common usage pattern for streaming iterations is 
to use
         * output splitting to send a part of the closing data stream to the 
head.
-        * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
-        * more information.
+        * Refer to {@link #split(OutputSelector)} for more information.
         * <p>
         * The iteration edge will be partitioned the same way as the first 
input of
         * the iteration head.
@@ -1176,8 +1194,8 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType(),
                                sinkInvokable);
 
-               streamGraph.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null,
-                               "sink", degreeOfParallelism);
+               streamGraph.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null, "sink",
+                               degreeOfParallelism);
 
                this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index dbfbc48..dcfd6fe 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,7 +22,6 @@ import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
@@ -101,28 +100,6 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
        }
 
        /**
-        * Operator used for directing tuples to specific named outputs using an
-        * {@link OutputSelector}. Calling this method on an operator creates a 
new
-        * {@link SplitDataStream}.
-        * 
-        * @param outputSelector
-        *            The user defined {@link OutputSelector} for directing the
-        *            tuples.
-        * @return The {@link SplitDataStream}
-        */
-       public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-               if (!isSplit) {
-                       this.isSplit = true;
-                       streamGraph.setOutputSelector(id, 
clean(outputSelector));
-
-                       return new SplitDataStream<OUT>(this);
-               } else {
-                       throw new RuntimeException("Currently operators can 
only be split once");
-               }
-
-       }
-
-       /**
         * This is a beta feature </br></br> Register an operator state for this
         * operator by the given name. This name can be used to retrieve the 
state
         * during runtime using {@link 
StreamingRuntimeContext#getState(String)}. To

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 2b5b7c5..97458a8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -56,7 +56,10 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
                }
 
                DataStream<OUT> returnStream = copy();
-               returnStream.userDefinedNames = Arrays.asList(outputNames);
+
+               for (DataStream<OUT> ds : returnStream.mergedStreams) {
+                       ds.userDefinedNames = Arrays.asList(outputNames);
+               }
                return returnStream;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 0d60939..1a12cb2 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -108,7 +108,7 @@ public class OutputHandler<OUT> {
                // We create a wrapper that will encapsulate the chained 
operators and
                // network outputs
                CollectorWrapper<OUT> wrapper = isDirectEmit ? new 
DirectedCollectorWrapper(
-                               chainedTaskConfig.getOutputSelector(cl)) : new 
CollectorWrapper<OUT>();
+                               chainedTaskConfig.getOutputSelectors(cl)) : new 
CollectorWrapper<OUT>();
 
                // Create collectors for the network outputs
                for (String output : chainedTaskConfig.getOutputs(cl)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
new file mode 100644
index 0000000..2486715
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class OutputSplitterTest {
+
+       private static final long MEMORYSIZE = 32;
+
+       private static ArrayList<Integer> splitterResult1 = new 
ArrayList<Integer>();
+       private static ArrayList<Integer> splitterResult2 = new 
ArrayList<Integer>();
+
+
+       private static ArrayList<Integer> expectedSplitterResult = new 
ArrayList<Integer>();
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testOnMergedDataStream() throws Exception {
+               splitterResult1.clear();
+               splitterResult2.clear();
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.setBufferTimeout(1);
+
+               DataStream<Integer> d1 = env.fromElements(0,2,4,6,8);
+               DataStream<Integer> d2 = env.fromElements(1,3,5,7,9);
+
+               d1 = d1.merge(d2);
+
+               d1.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
8354166915727490130L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value > 4) {
+                                       s.add(">");
+                               } else {
+                                       s.add("<");
+                               }
+                               return s;
+                       }
+               }).select(">").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
5827187510526388104L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult1.add(value);
+                       }
+               });
+
+               d1.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
-6822487543355994807L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 3 == 0) {
+                                       s.add("yes");
+                               } else {
+                                       s.add("no");
+                               }
+                               return s;
+                       }
+               }).select("yes").addSink(new SinkFunction<Integer>() {
+                       private static final long serialVersionUID = 
-2674335071267854599L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult2.add(value);
+                       }
+               });
+               env.execute();
+
+               Collections.sort(splitterResult1);
+               Collections.sort(splitterResult2);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(5,6,7,8,9));
+               assertEquals(expectedSplitterResult, splitterResult1);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,3,6,9));
+               assertEquals(expectedSplitterResult, splitterResult2);
+       }
+
+       @Test
+       public void testOnSingleDataStream() throws Exception {
+               splitterResult1.clear();
+               splitterResult2.clear();
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
+               env.setBufferTimeout(1);
+
+               DataStream<Integer> ds = env.fromElements(0,1,2,3,4,5,6,7,8,9);
+
+               ds.split(new OutputSelector<Integer>() {
+                       private static final long serialVersionUID = 
2524335410904414121L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 2 == 0) {
+                                       s.add("even");
+                               } else {
+                                       s.add("odd");
+                               }
+                               return s;
+                       }
+               }).select("even").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
-2995092337537209535L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult1.add(value);
+                       }
+               });
+
+               ds.split(new OutputSelector<Integer>() {
+
+                       private static final long serialVersionUID = 
-511693919586034092L;
+
+                       @Override
+                       public Iterable<String> select(Integer value) {
+                               List<String> s = new ArrayList<String>();
+                               if (value % 4 == 0) {
+                                       s.add("yes");
+                               } else {
+                                       s.add("no");
+                               }
+                               return s;
+                       }
+               }).select("yes").addSink(new SinkFunction<Integer>() {
+
+                       private static final long serialVersionUID = 
-1749077049727705424L;
+
+                       @Override
+                       public void invoke(Integer value) {
+                               splitterResult2.add(value);
+                       }
+               });
+               env.execute();
+
+               Collections.sort(splitterResult1);
+               Collections.sort(splitterResult2);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,2,4,6,8));
+               assertEquals(expectedSplitterResult, splitterResult1);
+
+               expectedSplitterResult.clear();
+               expectedSplitterResult.addAll(Arrays.asList(0,4,8));
+               assertEquals(expectedSplitterResult, splitterResult2);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce9a8ff/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 698b193..177a9ee 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -468,12 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * OutputSelector. Calling this method on an operator creates a new
    * SplitDataStream.
    */
-  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream 
match {
-    case op: SingleOutputStreamOperator[_, _] => op.split(selector)
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " can not be " +
-        "split.")
-  }
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = 
javaStream.split(selector)
 
   /**
    * Creates a new SplitDataStream that contains only the elements satisfying 
the

Reply via email to