[FLINK-1345] [streaming] Added support for chaining operators with directed 
outputs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92947f0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92947f0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92947f0d

Branch: refs/heads/master
Commit: 92947f0d8308afa74b666e11d5ec3af08f0198c1
Parents: e3b608c
Author: Gyula Fora <[email protected]>
Authored: Wed Jan 21 13:41:44 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:34 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  23 ++-
 .../apache/flink/streaming/api/StreamGraph.java |  22 ++-
 .../api/StreamingJobGraphGenerator.java         |  18 +--
 .../api/collector/CollectorWrapper.java         |   7 +-
 .../api/collector/DirectedCollectorWrapper.java | 128 +++++++++++++++++
 .../api/collector/DirectedOutputWrapper.java    | 144 -------------------
 .../streaming/api/collector/StreamOutput.java   |  56 ++++----
 .../api/collector/StreamOutputWrapper.java      | 116 ---------------
 .../streaming/api/datastream/DataStream.java    |   5 +-
 .../api/datastream/SplitDataStream.java         |   7 +-
 .../environment/StreamExecutionEnvironment.java |   2 +-
 .../api/streamvertex/OutputHandler.java         |  95 +++++-------
 .../api/streamvertex/StreamIterationHead.java   |  12 +-
 .../api/streamvertex/StreamIterationTail.java   |   3 +-
 .../api/collector/StreamCollectorTest.java      |  96 ++++++-------
 15 files changed, 283 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 213a892..94349d7 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +47,6 @@ public class StreamConfig implements Serializable {
        private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
        private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
        private static final String OUTPUT_NAME = "outputName_";
-       private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
        private static final String PARTITIONER_OBJECT = "partitionerObject_";
        private static final String VERTEX_NAME = "vertexName";
        private static final String ITERATION_ID = "iteration-id";
@@ -241,25 +241,18 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       public void setSelectAll(String output, Boolean selectAll) {
-               if (selectAll != null) {
-                       config.setBoolean(OUTPUT_SELECT_ALL + output, 
selectAll);
-               }
-       }
-
-       public boolean isSelectAll(String output) {
-               return config.getBoolean(OUTPUT_SELECT_ALL + output, true);
-       }
-
-       public void setOutputNames(String output, List<String> outputName) {
-               if (outputName != null) {
+       public void setSelectedNames(String output, List<String> selected) {
+               if (selected != null) {
+                       config.setBytes(OUTPUT_NAME + output,
+                                       
SerializationUtils.serialize((Serializable) selected));
+               } else {
                        config.setBytes(OUTPUT_NAME + output,
-                                       
SerializationUtils.serialize((Serializable) outputName));
+                                       
SerializationUtils.serialize((Serializable) new ArrayList<String>()));
                }
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getOutputNames(String output) {
+       public List<String> getSelectedNames(String output) {
                return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
                                null));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 3dd1bdb..c9ecd55 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -58,7 +58,6 @@ public class StreamGraph {
        private Map<String, List<String>> outEdgeLists;
        private Map<String, List<Integer>> outEdgeTypes;
        private Map<String, List<List<String>>> selectedNames;
-       private Map<String, List<Boolean>> outEdgeSelectAlls;
        private Map<String, List<String>> inEdgeLists;
        private Map<String, List<StreamPartitioner<?>>> outputPartitioners;
        private Map<String, String> operatorNames;
@@ -94,7 +93,6 @@ public class StreamGraph {
                outEdgeLists = new HashMap<String, List<String>>();
                outEdgeTypes = new HashMap<String, List<Integer>>();
                selectedNames = new HashMap<String, List<List<String>>>();
-               outEdgeSelectAlls = new HashMap<String, List<Boolean>>();
                inEdgeLists = new HashMap<String, List<String>>();
                outputPartitioners = new HashMap<String, 
List<StreamPartitioner<?>>>();
                operatorNames = new HashMap<String, String>();
@@ -186,7 +184,7 @@ public class StreamGraph {
 
                setEdge(vertexName, iterationHead,
                                
outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
-                               new ArrayList<String>(), false);
+                               new ArrayList<String>());
 
                iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), 
waitTime);
 
@@ -274,7 +272,6 @@ public class StreamGraph {
                outEdgeLists.put(vertexName, new ArrayList<String>());
                outEdgeTypes.put(vertexName, new ArrayList<Integer>());
                selectedNames.put(vertexName, new ArrayList<List<String>>());
-               outEdgeSelectAlls.put(vertexName, new ArrayList<Boolean>());
                inEdgeLists.put(vertexName, new ArrayList<String>());
                outputPartitioners.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
                iterationTailCount.put(vertexName, 0);
@@ -296,14 +293,12 @@ public class StreamGraph {
         *            User defined names of the out edge
         */
        public void setEdge(String upStreamVertexName, String 
downStreamVertexName,
-                       StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames,
-                       boolean selectAll) {
+                       StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames) {
                outEdgeLists.get(upStreamVertexName).add(downStreamVertexName);
                outEdgeTypes.get(upStreamVertexName).add(typeNumber);
                inEdgeLists.get(downStreamVertexName).add(upStreamVertexName);
                
outputPartitioners.get(upStreamVertexName).add(partitionerObject);
                selectedNames.get(upStreamVertexName).add(outputNames);
-               outEdgeSelectAlls.get(upStreamVertexName).add(selectAll);
        }
 
        private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
@@ -494,16 +489,15 @@ public class StreamGraph {
                return outEdgeTypes.get(vertexName);
        }
 
-       public StreamPartitioner<?> getOutPartitioner(String vertexName, int 
outputIndex) {
-               return outputPartitioners.get(vertexName).get(outputIndex);
+       public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, 
String downStreamVertex) {
+               return outputPartitioners.get(upStreamVertex).get(
+                               
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
        }
 
-       public List<String> getSelectedNames(String vertexName, int 
outputIndex) {
-               return selectedNames.get(vertexName).get(outputIndex);
-       }
+       public List<String> getSelectedNames(String upStreamVertex, String 
downStreamVertex) {
 
-       public Boolean isSelectAll(String vertexName, int outputIndex) {
-               return outEdgeSelectAlls.get(vertexName).get(outputIndex);
+               return selectedNames.get(upStreamVertex).get(
+                               
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
        }
 
        public Collection<Integer> getIterationIDs() {

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 6c0cc20..3b2d135 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -206,6 +206,13 @@ public class StreamingJobGraphGenerator {
                        
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName));
                }
 
+               List<String> allOutputs = new 
ArrayList<String>(chainableOutputs);
+               allOutputs.addAll(nonChainableOutputs);
+
+               for (String output : allOutputs) {
+                       config.setSelectedNames(output, 
streamGraph.getSelectedNames(vertexName, output));
+               }
+
                vertexConfigs.put(vertexName, config);
        }
 
@@ -229,7 +236,7 @@ public class StreamingJobGraphGenerator {
                downStreamConfig.setNumberOfInputs(numOfInputs);
 
                StreamPartitioner<?> partitioner = 
streamGraph.getOutPartitioner(upStreamVertexName,
-                               outputIndex);
+                               downStreamVertexName);
 
                upStreamConfig.setPartitioner(downStreamVertexName, 
partitioner);
 
@@ -243,11 +250,6 @@ public class StreamingJobGraphGenerator {
                        LOG.debug("CONNECTED: {} - {} -> {}", 
partitioner.getClass().getSimpleName(),
                                        headOfChain, downStreamVertexName);
                }
-
-               upStreamConfig.setOutputNames(downStreamVertexName,
-                               
streamGraph.getSelectedNames(upStreamVertexName, outputIndex));
-               upStreamConfig.setSelectAll(downStreamVertexName,
-                               streamGraph.isSelectAll(upStreamVertexName, 
outputIndex));
        }
 
        private boolean isChainable(String vertexName, String outName) {
@@ -257,12 +259,10 @@ public class StreamingJobGraphGenerator {
 
                return streamGraph.getInEdges(outName).size() == 1
                                && outInvokable != null
-                               && streamGraph.getOutputSelector(vertexName) == 
null
                                && outInvokable.getChainingStrategy() == 
ChainingStrategy.ALWAYS
                                && (headInvokable.getChainingStrategy() == 
ChainingStrategy.HEAD || headInvokable
                                                .getChainingStrategy() == 
ChainingStrategy.ALWAYS)
-                               && streamGraph.getOutPartitioner(vertexName,
-                                               
streamGraph.getOutEdges(vertexName).indexOf(outName)).getStrategy() == 
PartitioningStrategy.FORWARD
+                               && streamGraph.getOutPartitioner(vertexName, 
outName).getStrategy() == PartitioningStrategy.FORWARD
                                && streamGraph.getParallelism(vertexName) == 
streamGraph.getParallelism(outName)
                                && streamGraph.chaining;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index a95973b..1281bf0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -24,14 +24,15 @@ import org.apache.flink.util.Collector;
 
 public class CollectorWrapper<OUT> implements Collector<OUT> {
 
-       List<Collector<OUT>> outputs;
+       private List<Collector<OUT>> outputs;
 
        public CollectorWrapper() {
                this.outputs = new LinkedList<Collector<OUT>>();
        }
 
-       public void addCollector(Collector<OUT> output) {
-               outputs.add(output);
+       @SuppressWarnings("unchecked")
+       public void addCollector(Collector<?> output) {
+               outputs.add((Collector<OUT>) output);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
new file mode 100755
index 0000000..66fb667
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StreamCollector that uses user defined output names and a user defined
+ * output selector to make directed emits.
+ * 
+ * @param <OUT>
+ *            Type of the Tuple collected.
+ */
+public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedCollectorWrapper.class);
+
+       OutputSelector<OUT> outputSelector;
+
+       protected Map<String, List<Collector<OUT>>> outputMap;
+
+       private List<Collector<OUT>> selectAllOutputs;
+       private Set<Collector<OUT>> emitted;
+
+       /**
+        * Creates a new DirectedStreamCollector
+        * 
+        * @param outputSelector
+        *            User defined {@link OutputSelector}
+        */
+       public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) {
+               this.outputSelector = outputSelector;
+               this.emitted = new HashSet<Collector<OUT>>();
+               this.selectAllOutputs = new LinkedList<Collector<OUT>>();
+               this.outputMap = new HashMap<String, List<Collector<OUT>>>();
+
+       }
+
+       @Override
+       public void addCollector(Collector<?> output) {
+               addCollector(output, new ArrayList<String>());
+       }
+
+       @SuppressWarnings("unchecked")
+       public void addCollector(Collector<?> output, List<String> 
selectedNames) {
+
+               if (selectedNames.isEmpty()) {
+                       selectAllOutputs.add((Collector<OUT>) output);
+               } else {
+                       for (String selectedName : selectedNames) {
+
+                               if (!outputMap.containsKey(selectedName)) {
+                                       outputMap.put(selectedName, new 
LinkedList<Collector<OUT>>());
+                                       
outputMap.get(selectedName).add((Collector<OUT>) output);
+                               } else {
+                                       if 
(!outputMap.get(selectedName).contains(output)) {
+                                               
outputMap.get(selectedName).add((Collector<OUT>) output);
+                                       }
+                               }
+
+                       }
+               }
+       }
+
+       @Override
+       public void collect(OUT record) {
+               emitted.clear();
+
+               Iterable<String> outputNames = outputSelector.select(record);
+
+               for (Collector<OUT> output : selectAllOutputs) {
+                       output.collect(record);
+                       emitted.add(output);
+               }
+
+               for (String outputName : outputNames) {
+                       List<Collector<OUT>> outputList = 
outputMap.get(outputName);
+                       if (outputList == null) {
+                               if (LOG.isErrorEnabled()) {
+                                       String format = String.format(
+                                                       "Cannot emit because no 
output is selected with the name: %s",
+                                                       outputName);
+                                       LOG.error(format);
+
+                               }
+                       } else {
+                               for (Collector<OUT> output : outputList) {
+                                       if (!emitted.contains(output)) {
+                                               output.collect(record);
+                                               emitted.add(output);
+                                       }
+                               }
+
+                       }
+
+               }
+       }
+
+       @Override
+       public void close() {
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
deleted file mode 100755
index 6ff03c4..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- * 
- * @param <OUT>
- *            Type of the Tuple collected.
- */
-public class DirectedOutputWrapper<OUT> extends StreamOutputWrapper<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedOutputWrapper.class);
-
-       OutputSelector<OUT> outputSelector;
-
-       protected Map<String, List<StreamOutput<OUT>>> outputMap;
-
-       private List<StreamOutput<OUT>> selectAllOutputs;
-       private Set<StreamOutput<OUT>> emitted;
-
-       /**
-        * Creates a new DirectedStreamCollector
-        * 
-        * @param channelID
-        *            Channel ID of the Task
-        * @param serializationDelegate
-        *            Serialization delegate used for serialization
-        * @param outputSelector
-        *            User defined {@link OutputSelector}
-        */
-       public DirectedOutputWrapper(int channelID,
-                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate,
-                       OutputSelector<OUT> outputSelector) {
-               super(channelID, serializationDelegate);
-               this.outputSelector = outputSelector;
-               this.emitted = new HashSet<StreamOutput<OUT>>();
-               this.selectAllOutputs = new LinkedList<StreamOutput<OUT>>();
-               this.outputMap = new HashMap<String, List<StreamOutput<OUT>>>();
-
-       }
-
-       @Override
-       public void addOutput(StreamOutput<OUT> output) {
-
-               super.addOutput(output);
-
-               if (output.isSelectAll()) {
-                       selectAllOutputs.add(output);
-               } else {
-                       for (String selectedName : output.getSelectedNames()) {
-                               if (selectedName != null) {
-                                       if 
(!outputMap.containsKey(selectedName)) {
-                                               outputMap.put(selectedName, new 
LinkedList<StreamOutput<OUT>>());
-                                               
outputMap.get(selectedName).add(output);
-                                       } else {
-                                               if 
(!outputMap.get(selectedName).contains(output)) {
-                                                       
outputMap.get(selectedName).add(output);
-                                               }
-                                       }
-
-                               }
-                       }
-               }
-       }
-
-       @Override
-       protected void emit() {
-               Iterable<String> outputNames = 
outputSelector.select(serializationDelegate.getInstance()
-                               .getObject());
-               emitted.clear();
-
-               for (StreamOutput<OUT> output : selectAllOutputs) {
-                       try {
-                               output.collect(serializationDelegate);
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit to {} failed due to: 
{}", output,
-                                                       
StringUtils.stringifyException(e));
-                               }
-                       }
-               }
-
-               for (String outputName : outputNames) {
-                       List<StreamOutput<OUT>> outputList = 
outputMap.get(outputName);
-                       try {
-                               if (outputList == null) {
-                                       if (LOG.isErrorEnabled()) {
-                                               String format = String.format(
-                                                               "Cannot emit 
because no output is selected with the name: %s",
-                                                               outputName);
-                                               LOG.error(format);
-
-                                       }
-                               } else {
-
-                                       for (StreamOutput<OUT> output : 
outputList) {
-                                               if (!emitted.contains(output)) {
-                                                       
output.collect(serializationDelegate);
-                                                       emitted.add(output);
-                                               }
-                                       }
-
-                               }
-
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit to {} failed due to: 
{}", outputName,
-                                                       
StringUtils.stringifyException(e));
-                               }
-                       }
-
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index 6fd1b98..4551c5a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -18,37 +18,55 @@
 package org.apache.flink.streaming.api.collector;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<StreamRecord<OUT>>> {
+public class StreamOutput<OUT> implements Collector<OUT> {
 
-       private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamOutput.class);
 
-       private List<String> selectedNames;
-       private boolean selectAll = true;
+       private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+       private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
+       private StreamRecord<OUT> streamRecord;
+       private int channelID;
 
        public 
StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       List<String> selectedNames) {
+                       int channelID, SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate) {
 
-               this.output = output;
+               this.serializationDelegate = serializationDelegate;
 
-               if (selectedNames != null) {
-                       this.selectedNames = selectedNames;
-                       selectAll = false;
+               if (serializationDelegate != null) {
+                       this.streamRecord = serializationDelegate.getInstance();
+               } else {
+                       throw new RuntimeException("Serializer cannot be null");
                }
+               this.channelID = channelID;
+               this.output = output;
        }
 
-       public void collect(SerializationDelegate<StreamRecord<OUT>> record) {
+       public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
getRecordWriter() {
+               return output;
+       }
+
+       @Override
+       public void collect(OUT record) {
+               streamRecord.setObject(record);
+               streamRecord.newId(channelID);
+               serializationDelegate.setInstance(streamRecord);
+
                try {
-                       output.emit(record);
+                       output.emit(serializationDelegate);
                } catch (Exception e) {
-                       throw new RuntimeException("Could not emit record: " + 
record.getInstance());
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Emit failed due to: {}", 
StringUtils.stringifyException(e));
+                       }
                }
        }
 
@@ -65,16 +83,4 @@ public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<Stream
                }
        }
 
-       public boolean isSelectAll() {
-               return selectAll;
-       }
-
-       public List<String> getSelectedNames() {
-               return selectedNames;
-       }
-
-       public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
getRecordWriter() {
-               return output;
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
deleted file mode 100755
index c3e4c9d..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Collector for tuples in Apache Flink stream processing. The collected values
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
- * 
- * @param <OUT>
- *            Type of the Tuples/Objects collected.
- */
-public class StreamOutputWrapper<OUT> implements Collector<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamOutputWrapper.class);
-
-       protected StreamRecord<OUT> streamRecord;
-       protected int channelID;
-       protected List<StreamOutput<OUT>> outputs;
-       protected SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate;
-
-       /**
-        * Creates a new StreamCollector
-        * 
-        * @param channelID
-        *            Channel ID of the Task
-        * @param serializationDelegate
-        *            Serialization delegate used for serialization
-        */
-       public StreamOutputWrapper(int channelID,
-                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate) {
-               this.serializationDelegate = serializationDelegate;
-
-               if (serializationDelegate != null) {
-                       this.streamRecord = serializationDelegate.getInstance();
-               } else {
-                       this.streamRecord = new StreamRecord<OUT>();
-               }
-               this.channelID = channelID;
-               this.outputs = new LinkedList<StreamOutput<OUT>>();
-       }
-
-       /**
-        * Adds an output with the given user defined name
-        * 
-        * @param output
-        *            The RecordWriter object representing the output.
-        * @param outputNames
-        *            User defined names of the output.
-        * @param isSelectAllOutput
-        *            Marks whether all the outputs are selected.
-        */
-       public void addOutput(StreamOutput<OUT> output) {
-               outputs.add(output);
-       }
-
-       /**
-        * Collects and emits a tuple/object to the outputs by reusing a
-        * StreamRecord object.
-        * 
-        * @param outputObject
-        *            Object to be collected and emitted.
-        */
-       @Override
-       public void collect(OUT outputObject) {
-               streamRecord.setObject(outputObject);
-               streamRecord.newId(channelID);
-               serializationDelegate.setInstance(streamRecord);
-
-               emit();
-       }
-
-       /**
-        * Emits the current streamrecord to the outputs.
-        */
-       protected void emit() {
-               for (StreamOutput<OUT> output : outputs) {
-                       try {
-                               output.collect(serializationDelegate);
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit failed due to: {}", 
StringUtils.stringifyException(e));
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8e7d823..0f633ec 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -96,7 +96,6 @@ public class DataStream<OUT> {
        protected final String id;
        protected int degreeOfParallelism;
        protected List<String> userDefinedNames;
-       protected boolean selectAll;
        protected StreamPartitioner<OUT> partitioner;
        @SuppressWarnings("rawtypes")
        protected TypeInformation typeInfo;
@@ -127,7 +126,6 @@ public class DataStream<OUT> {
                this.degreeOfParallelism = environment.getDegreeOfParallelism();
                this.streamGraph = environment.getStreamGraph();
                this.userDefinedNames = new ArrayList<String>();
-               this.selectAll = true;
                this.partitioner = new DistributePartitioner<OUT>(true);
                this.typeInfo = typeInfo;
                this.mergedStreams = new ArrayList<DataStream<OUT>>();
@@ -145,7 +143,6 @@ public class DataStream<OUT> {
                this.id = dataStream.id;
                this.degreeOfParallelism = dataStream.degreeOfParallelism;
                this.userDefinedNames = new 
ArrayList<String>(dataStream.userDefinedNames);
-               this.selectAll = dataStream.selectAll;
                this.partitioner = dataStream.partitioner;
                this.streamGraph = dataStream.streamGraph;
                this.typeInfo = dataStream.typeInfo;
@@ -1158,7 +1155,7 @@ public class DataStream<OUT> {
        protected <X> void connectGraph(DataStream<X> inputStream, String 
outputID, int typeNumber) {
                for (DataStream<X> stream : inputStream.mergedStreams) {
                        streamGraph.setEdge(stream.getId(), outputID, 
stream.partitioner, typeNumber,
-                                       inputStream.userDefinedNames, 
inputStream.selectAll);
+                                       inputStream.userDefinedNames);
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 1bf4f9c..2b5b7c5 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -49,8 +49,13 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
        }
 
        private DataStream<OUT> selectOutput(String[] outputNames) {
+               for (String outName : outputNames) {
+                       if (outName == null) {
+                               throw new RuntimeException("Selected names must 
not be null");
+                       }
+               }
+
                DataStream<OUT> returnStream = copy();
-               returnStream.selectAll = false;
                returnStream.userDefinedNames = Arrays.asList(outputNames);
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 51dc0ae..8a9be93 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -313,7 +313,7 @@ public abstract class StreamExecutionEnvironment {
         */
        public DataStreamSource<String> socketTextStream(String hostname, int 
port, char delimiter) {
                return addSource(new SocketTextStreamFunction(hostname, port, 
delimiter), null,
-                               "socketStrean");
+                               "socketStream");
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 135f742..0d60939 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -29,10 +29,8 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
-import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.collector.StreamOutputWrapper;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -70,7 +68,7 @@ public class OutputHandler<OUT> {
                // registrations by outputname
                this.chainedConfigs = 
configuration.getTransitiveChainedTaskConfigs(cl);
                this.chainedConfigs.put(configuration.getTaskName(), 
configuration);
-                               
+
                this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
 
                // We iterate through all the out edges from this job vertex 
and create
@@ -105,21 +103,35 @@ public class OutputHandler<OUT> {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private Collector<OUT> createChainedCollector(StreamConfig 
chainedTaskConfig) {
 
+               boolean isDirectEmit = chainedTaskConfig.isDirectedEmit();
+
                // We create a wrapper that will encapsulate the chained 
operators and
                // network outputs
-               CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>();
+               CollectorWrapper<OUT> wrapper = isDirectEmit ? new 
DirectedCollectorWrapper(
+                               chainedTaskConfig.getOutputSelector(cl)) : new 
CollectorWrapper<OUT>();
+
+               // Create collectors for the network outputs
+               for (String output : chainedTaskConfig.getOutputs(cl)) {
 
-               // If the task has network outputs we create a collector for 
those and
-               // pass
-               // it to the wrapper
-               if (chainedTaskConfig.getNumberOfOutputs() > 0) {
-                       wrapper.addCollector((Collector<OUT>) 
createNetworkCollector(chainedTaskConfig));
+                       Collector<?> outCollector = outputMap.get(output);
+
+                       if (isDirectEmit) {
+                               ((DirectedCollectorWrapper<OUT>) 
wrapper).addCollector(outCollector,
+                                               
chainedTaskConfig.getSelectedNames(output));
+                       } else {
+                               wrapper.addCollector(outCollector);
+                       }
                }
 
-               // If the task has chained outputs we create a chained 
collector for
-               // each of them and pass it to the wrapper
+               // Create collectors for the chained outputs
                for (String output : chainedTaskConfig.getChainedOutputs(cl)) {
-                       
wrapper.addCollector(createChainedCollector(chainedConfigs.get(output)));
+                       Collector<?> outCollector = 
createChainedCollector(chainedConfigs.get(output));
+                       if (isDirectEmit) {
+                               ((DirectedCollectorWrapper<OUT>) 
wrapper).addCollector(outCollector,
+                                               
chainedTaskConfig.getSelectedNames(output));
+                       } else {
+                               wrapper.addCollector(outCollector);
+                       }
                }
 
                if (chainedTaskConfig.isChainStart()) {
@@ -140,47 +152,6 @@ public class OutputHandler<OUT> {
 
        }
 
-       /**
-        * We create the collector for the network outputs of the task 
represented
-        * by the config using the StreamOutputs that we have set up in the
-        * constructor.
-        * 
-        * @param config
-        *            The config of the task
-        * @return We return a collector that represents all the network 
outputs of
-        *         this task
-        */
-       @SuppressWarnings("unchecked")
-       private <T> Collector<T> createNetworkCollector(StreamConfig config) {
-
-               StreamRecordSerializer<T> outSerializer = config
-                               .getTypeSerializerOut1(vertex.userClassLoader);
-               SerializationDelegate<StreamRecord<T>> outSerializationDelegate 
= null;
-
-               if (outSerializer != null) {
-                       outSerializationDelegate = new 
SerializationDelegate<StreamRecord<T>>(outSerializer);
-                       
outSerializationDelegate.setInstance(outSerializer.createInstance());
-               }
-
-               StreamOutputWrapper<T> collector;
-
-               if (vertex.configuration.isDirectedEmit()) {
-                       OutputSelector<T> outputSelector = vertex.configuration
-                                       
.getOutputSelector(vertex.userClassLoader);
-
-                       collector = new 
DirectedOutputWrapper<T>(vertex.getInstanceID(),
-                                       outSerializationDelegate, 
outputSelector);
-               } else {
-                       collector = new 
StreamOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate);
-               }
-
-               for (String output : config.getOutputs(cl)) {
-                       collector.addOutput((StreamOutput<T>) 
outputMap.get(output));
-               }
-
-               return collector;
-       }
-
        public Collector<OUT> getCollector() {
                return outerCollector;
        }
@@ -197,7 +168,16 @@ public class OutputHandler<OUT> {
         */
        private <T> StreamOutput<T> createStreamOutput(String outputVertex, 
StreamConfig configuration,
                        int outputIndex) {
-               
+
+               StreamRecordSerializer<T> outSerializer = configuration
+                               .getTypeSerializerOut1(vertex.userClassLoader);
+               SerializationDelegate<StreamRecord<T>> outSerializationDelegate 
= null;
+
+               if (outSerializer != null) {
+                       outSerializationDelegate = new 
SerializationDelegate<StreamRecord<T>>(outSerializer);
+                       
outSerializationDelegate.setInstance(outSerializer.createInstance());
+               }
+
                StreamPartitioner<T> outputPartitioner = 
configuration.getPartitioner(cl, outputVertex);
 
                RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
@@ -221,9 +201,8 @@ public class OutputHandler<OUT> {
                        }
                }
 
-               StreamOutput<T> streamOutput = new StreamOutput<T>(output,
-                               configuration.isSelectAll(outputVertex) ? null
-                                               : 
configuration.getOutputNames(outputVertex));
+               StreamOutput<T> streamOutput = new StreamOutput<T>(output, 
vertex.instanceID,
+                               outSerializationDelegate);
 
                if (LOG.isTraceEnabled()) {
                        LOG.trace("Partitioner set: {} with {} outputs for {}", 
outputPartitioner.getClass()

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index cba23b8..bc89241 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -23,16 +23,13 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, 
OUT> {
+public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
@@ -75,10 +72,6 @@ public class StreamIterationHead<OUT extends Tuple> extends 
StreamVertex<OUT, OU
                }
 
                StreamRecord<OUT> nextRecord;
-               StreamRecordSerializer<OUT> serializer = configuration
-                               .getTypeSerializerOut1(userClassLoader);
-               SerializationDelegate<StreamRecord<OUT>> serializationDelegate 
= new SerializationDelegate<StreamRecord<OUT>>(
-                               serializer);
 
                List<StreamOutput<OUT>> outputs = new 
LinkedList<StreamOutput<OUT>>();
                for (StreamOutput<?> output : outputHandler.getOutputs()) {
@@ -95,8 +88,7 @@ public class StreamIterationHead<OUT extends Tuple> extends 
StreamVertex<OUT, OU
                                break;
                        }
                        for (StreamOutput<OUT> output : outputs) {
-                               serializationDelegate.setInstance(nextRecord);
-                               output.collect(serializationDelegate);
+                               output.collect(nextRecord.getObject());
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index 1883c06..b3ecdf8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -20,14 +20,13 @@ package org.apache.flink.streaming.api.streamvertex;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN, 
IN> {
+public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 0bc3d7d..49b3bf8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -1,54 +1,52 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertArrayEquals;
+ */
 
-import java.util.ArrayList;
+package org.apache.flink.streaming.api.collector;
 
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
-import org.apache.flink.streaming.util.MockRecordWriterFactory;
-import org.junit.Test;
-
-public class StreamCollectorTest {
-
-       @Test
-       public void testCollect() {
-               MockRecordWriter recWriter = MockRecordWriterFactory.create();
-               SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new 
SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
-                               null);
-               sd.setInstance(new 
StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
-
-               StreamOutputWrapper<Tuple1<Integer>> collector = new 
StreamOutputWrapper<Tuple1<Integer>>(2, sd);
-               collector.addOutput(new 
StreamOutput<Tuple1<Integer>>(recWriter, new ArrayList<String>()));
-               collector.collect(new Tuple1<Integer>(3));
-               collector.collect(new Tuple1<Integer>(4));
-               collector.collect(new Tuple1<Integer>(5));
-               collector.collect(new Tuple1<Integer>(6));
-
-               assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, 
recWriter.emittedRecords.toArray());
-       }
-
-       @Test
-       public void testClose() {
-       }
-
-}
+import org.apache.flink.streaming.util.MockRecordWriterFactory;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamCollectorTest {
+
+       @Test
+       public void testCollect() {
+               MockRecordWriter recWriter = MockRecordWriterFactory.create();
+               SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new 
SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
+                               null);
+               sd.setInstance(new 
StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
+
+               Collector<Tuple1<Integer>> collector = new 
StreamOutput<Tuple1<Integer>>(recWriter, 2, sd);
+               collector.collect(new Tuple1<Integer>(3));
+               collector.collect(new Tuple1<Integer>(4));
+               collector.collect(new Tuple1<Integer>(5));
+               collector.collect(new Tuple1<Integer>(6));
+
+               assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, 
recWriter.emittedRecords.toArray());
+       }
+
+       @Test
+       public void testClose() {
+       }
+
+}

Reply via email to