[FLINK-1345] [streaming] Advanced task chaining added

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbb55ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbb55ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbb55ec

Branch: refs/heads/master
Commit: 7dbb55ece0a9d9777c0e3254bc8f9f5cf566d535
Parents: 3e30c6f
Author: Gyula Fora <[email protected]>
Authored: Sun Jan 18 18:23:57 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:34 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 298 ++++++++-----------
 .../flink/streaming/api/StreamConfig.java       | 180 +++++++----
 .../api/collector/CollectorWrapper.java         |   2 +-
 .../streaming/api/collector/StreamOutput.java   |  15 +
 .../api/collector/StreamOutputWrapper.java      |   6 -
 .../api/streamvertex/OutputHandler.java         | 204 ++++++++-----
 .../api/streamvertex/StreamIterationHead.java   |  23 +-
 .../flink/streaming/api/scala/DataStream.scala  |  10 +
 8 files changed, 433 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 0020d48..6ae97c9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -24,9 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,7 +35,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -85,11 +85,11 @@ public class JobGraphBuilder {
        private Map<String, Long> iterationWaitTime;
        private Map<String, Map<String, OperatorState<?>>> operatorStates;
        private Map<String, InputFormat<String, ?>> inputFormatList;
-       private Map<String, List<String>> chainedVertices;
-       private Map<String, String> lastInChains;
+       private Map<String, Map<String, StreamConfig>> chainedConfigs;
+       private Map<String, StreamConfig> vertexConfigs;
 
        private Set<String> sources;
-       private Set<String> builtVertices;
+       private Set<String> builtNodes;
 
        /**
         * Creates an new {@link JobGraph} with the given name. A JobGraph is a 
DAG
@@ -129,11 +129,11 @@ public class JobGraphBuilder {
                iterationWaitTime = new HashMap<String, Long>();
                operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
                inputFormatList = new HashMap<String, InputFormat<String, ?>>();
-               chainedVertices = new HashMap<String, List<String>>();
-               lastInChains = new HashMap<String, String>();
+               chainedConfigs = new HashMap<String, Map<String, 
StreamConfig>>();
+               vertexConfigs = new HashMap<String, StreamConfig>();
 
                sources = new HashSet<String>();
-               builtVertices = new HashSet<String>();
+               builtNodes = new HashSet<String>();
        }
 
        /**
@@ -198,6 +198,8 @@ public class JobGraphBuilder {
 
                addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
 
+               chaining = false;
+
                iterationIds.put(vertexName, iterationID);
                iterationIDtoHeadName.put(iterationID, vertexName);
 
@@ -297,7 +299,6 @@ public class JobGraphBuilder {
                inEdgeList.put(vertexName, new ArrayList<String>());
                outPartitioning.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
                iterationTailCount.put(vertexName, 0);
-               lastInChains.put(vertexName, vertexName);
        }
 
        private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
@@ -309,170 +310,175 @@ public class JobGraphBuilder {
                typeSerializersOut2.put(vertexName, out2);
        }
 
-       /**
-        * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and 
sets its
-        * config parameters using the ones set previously.
-        * 
-        * @param vertexName
-        *            Name for which the vertex will be created.
-        */
-       private void createVertex(String vertexName) {
+       private List<Tuple2<String, String>> createChain(String startNode, 
String current) {
 
-               if (!builtVertices.contains(vertexName)) {
-                       if (!outEdgeList.get(vertexName).isEmpty()) {
+               if (!builtNodes.contains(startNode)) {
 
-                               for (String outName : 
outEdgeList.get(vertexName)) {
-                                       if (isChainable(vertexName, outName)) {
-                                               chainRecursively(vertexName, 
vertexName, outName);
-                                       } else {
-                                               createVertex(outName);
-                                       }
+                       List<Tuple2<String, String>> transitiveOutEdges = new 
ArrayList<Tuple2<String, String>>();
+                       List<String> chainableOutputs = new ArrayList<String>();
+                       List<String> nonChainableOutputs = new 
ArrayList<String>();
 
+                       for (String outName : outEdgeList.get(current)) {
+                               if (isChainable(current, outName)) {
+                                       chainableOutputs.add(outName);
+                               } else {
+                                       nonChainableOutputs.add(outName);
                                }
+
                        }
 
-                       List<String> chainedNames = 
chainedVertices.get(vertexName);
-                       boolean isChained = chainedNames != null;
-                       int numChained = isChained ? chainedNames.size() : 0;
-                       String lastInChain = lastInChains.get(vertexName);
-
-                       // Get vertex attributes
-                       Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
-                       StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
-                       int parallelism = vertexParallelism.get(vertexName);
-                       byte[] outputSelector = 
outputSelectors.get(lastInChain);
-                       Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
-
-                       // Create vertex object
-                       String cname = chainedVertices.get(vertexName) == null 
? "" : " => "
-                                       + 
StringUtils.join(chainedVertices.get(vertexName), " => ");
-                       AbstractJobVertex vertex = new 
AbstractJobVertex(vertexName + cname);
-
-                       this.jobGraph.addVertex(vertex);
-
-                       vertex.setInvokableClass(vertexClass);
-                       vertex.setParallelism(parallelism);
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Parallelism set: {} for {}", 
parallelism, vertexName);
+                       for (String chainable : chainableOutputs) {
+                               
transitiveOutEdges.addAll(createChain(startNode, chainable));
                        }
 
-                       // Set vertex config
+                       for (String nonChainable : nonChainableOutputs) {
+                               transitiveOutEdges.add(new Tuple2<String, 
String>(current, nonChainable));
+                               
transitiveOutEdges.addAll(createChain(nonChainable, nonChainable));
+                       }
 
-                       StreamConfig config = new 
StreamConfig(vertex.getConfiguration());
+                       StreamConfig config = current.equals(startNode) ? 
createProcessingVertex(startNode)
+                                       : new StreamConfig(new Configuration());
 
-                       config.setBufferTimeout(bufferTimeout.get(lastInChain));
+                       setVertexConfig(current, config, chainableOutputs, 
nonChainableOutputs);
 
-                       
config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
-                       
config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
-                       
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
-                       
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+                       if (current.equals(startNode)) {
 
-                       config.setUserInvokable(invokableObject);
-                       config.setOutputSelector(outputSelector);
-                       config.setOperatorStates(state);
+                               config.setChainStart();
+                               config.setRecordWriterOrder(transitiveOutEdges);
 
-                       if (vertexClass.equals(StreamIterationHead.class)
-                                       || 
vertexClass.equals(StreamIterationTail.class)) {
-                               
config.setIterationId(iterationIds.get(vertexName));
-                               
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
-                       }
+                               for (Tuple2<String, String> edge : 
transitiveOutEdges) {
+                                       connect(startNode, edge);
+                               }
 
-                       if (inputFormatList.containsKey(vertexName)) {
-                               
vertex.setInputSplitSource(inputFormatList.get(vertexName));
-                       }
+                               
vertexConfigs.get(startNode).setTransitiveChainedTaskConfigs(
+                                               chainedConfigs.get(startNode));
+
+                       } else {
 
-                       config.setNumberofChainedTasks(numChained);
+                               Map<String, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
 
-                       for (int i = 0; i < numChained; i++) {
-                               config.setChainedInvokable(
-                                               (ChainableInvokable<?, ?>) 
invokableObjects.get(chainedNames.get(i)), i);
-                               
config.setChainedSerializer(typeSerializersIn1.get(chainedNames.get(i)), i);
+                               if (chainedConfs == null) {
+                                       chainedConfigs.put(startNode, new 
HashMap<String, StreamConfig>());
+                               }
+                               chainedConfigs.get(startNode).put(current, 
config);
                        }
 
-                       streamVertices.put(vertexName, vertex);
-                       builtVertices.add(vertexName);
+                       return transitiveOutEdges;
 
+               } else {
+                       return new ArrayList<Tuple2<String, String>>();
                }
+       }
+
+       private StreamConfig createProcessingVertex(String vertexName) {
+
+               AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
+
+               this.jobGraph.addVertex(vertex);
 
+               int parallelism = vertexParallelism.get(vertexName);
+
+               vertex.setInvokableClass(vertexClasses.get(vertexName));
+               vertex.setParallelism(parallelism);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Parallelism set: {} for {}", parallelism, 
vertexName);
+               }
+
+               if (inputFormatList.containsKey(vertexName)) {
+                       
vertex.setInputSplitSource(inputFormatList.get(vertexName));
+               }
+
+               streamVertices.put(vertexName, vertex);
+               builtNodes.add(vertexName);
+
+               return new StreamConfig(vertex.getConfiguration());
        }
 
-       private void chainRecursively(String chainStart, String current, String 
next) {
-               // We chain the next operator to the start of this chain
-               chainTasks(chainStart, next);
-               // Now recursively chain the outputs of next (depth first)
-               for (String output : outEdgeList.get(next)) {
-                       if (isChainable(next, output)) {
-                               // Recursive call
-                               chainRecursively(chainStart, next, output);
-                       } else {
-                               // If not chainable we continue building the 
jobgraph from there
-                               createVertex(output);
-                       }
+       private void setVertexConfig(String vertexName, StreamConfig config,
+                       List<String> chainableOutputs, List<String> 
nonChainableOutputs) {
+
+               StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
+               byte[] outputSelector = outputSelectors.get(vertexName);
+               Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
+               Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
+
+               config.setVertexName(vertexName);
+
+               config.setBufferTimeout(bufferTimeout.get(vertexName));
+
+               config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+               config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+               
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+               
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+
+               config.setUserInvokable(invokableObject);
+               config.setOutputSelector(outputSelector);
+               config.setOperatorStates(state);
+
+               config.setNumberOfOutputs(nonChainableOutputs.size());
+               config.setOutputs(nonChainableOutputs);
+               config.setChainedOutputs(chainableOutputs);
+
+               if (vertexClass.equals(StreamIterationHead.class)
+                               || 
vertexClass.equals(StreamIterationTail.class)) {
+                       config.setIterationId(iterationIds.get(vertexName));
+                       
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
                }
+
+               vertexConfigs.put(vertexName, config);
        }
 
        private boolean isChainable(String vertexName, String outName) {
-               return outEdgeList.get(vertexName).size() == 1
-                               && inEdgeList.get(outName).size() == 1
+               return inEdgeList.get(outName).size() == 1
+                               && invokableObjects.get(outName) != null
                                && outputSelectors.get(vertexName) == null
                                && 
invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS
                                && 
(invokableObjects.get(vertexName).getChainingStrategy() == 
ChainingStrategy.HEAD || invokableObjects
                                                
.get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS)
-                               && 
outPartitioning.get(vertexName).get(0).getStrategy() == 
PartitioningStrategy.FORWARD
+                               && outPartitioning.get(vertexName)
+                                               
.get(outEdgeList.get(vertexName).indexOf(outName)).getStrategy() == 
PartitioningStrategy.FORWARD
                                && vertexParallelism.get(vertexName) == 
vertexParallelism.get(outName) && chaining;
        }
 
-       private void chainTasks(String first, String second) {
+       private <T> void connect(String headOfChain, Tuple2<String, String> 
edge) {
 
-               List<String> chained = chainedVertices.get(first);
-               if (chained == null) {
-                       chained = new ArrayList<String>();
-               }
-               chained.add(second);
-               chainedVertices.put(first, chained);
-               lastInChains.put(first, second);
-
-       }
+               String upStreamVertexName = edge.f0;
+               String downStreamVertexName = edge.f1;
 
-       /**
-        * Connects two vertices with the given names, partitioning and channel 
type
-        * 
-        * @param upStreamVertexName
-        *            Name of the upstream vertex, that will emit the values
-        * @param downStreamVertexName
-        *            Name of the downstream vertex, that will receive the 
values
-        * @param partitionerObject
-        *            The partitioner
-        */
-       private <T> void connect(String upStreamVertexName, String 
downStreamVertexName,
-                       StreamPartitioner<T> partitionerObject) {
+               int outputIndex = 
outEdgeList.get(upStreamVertexName).indexOf(downStreamVertexName);
 
-               AbstractJobVertex upStreamVertex = 
streamVertices.get(upStreamVertexName);
+               AbstractJobVertex headVertex = streamVertices.get(headOfChain);
                AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamVertexName);
 
-               StreamConfig config = new 
StreamConfig(upStreamVertex.getConfiguration());
+               StreamConfig downStreamConfig = new 
StreamConfig(downStreamVertex.getConfiguration());
+               StreamConfig upStreamConfig = new 
StreamConfig(headVertex.getConfiguration());
+
+               List<Integer> outEdgeIndexList = 
outEdgeIndex.get(upStreamVertexName);
+               int numOfInputs = downStreamConfig.getNumberOfInputs();
+
+               downStreamConfig.setInputIndex(numOfInputs++, 
outEdgeIndexList.get(outputIndex));
+               downStreamConfig.setNumberOfInputs(numOfInputs);
+
+               StreamPartitioner<?> partitionerObject = 
outPartitioning.get(upStreamVertexName).get(
+                               outputIndex);
+
+               upStreamConfig.setPartitioner(downStreamVertexName, 
partitionerObject);
 
                if (partitionerObject.getStrategy() == 
PartitioningStrategy.FORWARD) {
-                       downStreamVertex
-                                       
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
+                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
                } else {
-                       
downStreamVertex.connectNewDataSetAsInput(upStreamVertex,
-                                       DistributionPattern.ALL_TO_ALL);
+                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.ALL_TO_ALL);
                }
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("CONNECTED: {} - {} -> {}", 
partitionerObject.getClass().getSimpleName(),
-                                       upStreamVertexName, 
downStreamVertexName);
+                                       headOfChain, downStreamVertexName);
                }
 
-               int outputIndex = 
upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1;
-
-               config.setOutputName(outputIndex, 
outEdgeNames.get(lastInChains.get(upStreamVertexName))
+               upStreamConfig.setOutputNames(downStreamVertexName, 
outEdgeNames.get(upStreamVertexName)
                                .get(outputIndex));
-               config.setSelectAll(outputIndex, 
outEdgeSelectAll.get(lastInChains.get(upStreamVertexName))
+               upStreamConfig.setSelectAll(downStreamVertexName, 
outEdgeSelectAll.get(upStreamVertexName)
                                .get(outputIndex));
-               config.setPartitioner(outputIndex, partitionerObject);
-               config.setNumberOfOutputChannels(outputIndex, 
vertexParallelism.get(downStreamVertexName));
        }
 
        /**
@@ -630,27 +636,6 @@ public class JobGraphBuilder {
        }
 
        /**
-        * Writes number of inputs into each JobVertex's config
-        */
-       private void setNumberOfJobInputs() {
-               for (AbstractJobVertex vertex : streamVertices.values()) {
-                       (new 
StreamConfig(vertex.getConfiguration())).setNumberOfInputs(vertex
-                                       .getNumberOfInputs());
-               }
-       }
-
-       /**
-        * Writes the number of outputs and output channels into each 
JobVertex's
-        * config
-        */
-       private void setNumberOfJobOutputs() {
-               for (AbstractJobVertex vertex : streamVertices.values()) {
-                       (new 
StreamConfig(vertex.getConfiguration())).setNumberOfOutputs(vertex
-                                       
.getNumberOfProducedIntermediateDataSets());
-               }
-       }
-
-       /**
         * Gets the assembled {@link JobGraph} and adds a default name for it.
         */
        public JobGraph getJobGraph() {
@@ -677,33 +662,10 @@ public class JobGraphBuilder {
        private void buildJobGraph() {
 
                for (String sourceName : sources) {
-                       createVertex(sourceName);
-               }
-
-               for (String upStreamVertexName : builtVertices) {
-                       int i = 0;
-
-                       List<Integer> outEdgeTypeList = 
outEdgeIndex.get(lastInChains.get(upStreamVertexName));
-
-                       for (String downStreamVertexName : outEdgeList
-                                       
.get(lastInChains.get(upStreamVertexName))) {
-                               StreamConfig downStreamVertexConfig = new 
StreamConfig(streamVertices.get(
-                                               
downStreamVertexName).getConfiguration());
-
-                               int inputNumber = 
downStreamVertexConfig.getNumberOfInputs();
-
-                               
downStreamVertexConfig.setInputIndex(inputNumber++, outEdgeTypeList.get(i));
-                               
downStreamVertexConfig.setNumberOfInputs(inputNumber);
-
-                               connect(upStreamVertexName, 
downStreamVertexName,
-                                               
outPartitioning.get(lastInChains.get(upStreamVertexName)).get(i));
-                               i++;
-                       }
+                       createChain(sourceName, sourceName);
                }
 
                setSlotSharing();
-               setNumberOfJobInputs();
-               setNumberOfJobOutputs();
        }
 
        public void setChaining(boolean chaining) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index ada3aae..6fffaa6 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api;
 
-import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
@@ -35,17 +35,20 @@ import 
org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.InstantiationUtil;
 
-public class StreamConfig {
+public class StreamConfig implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
        private static final String INPUT_TYPE = "inputType_";
        private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
        private static final String NUMBER_OF_INPUTS = "numberOfInputs";
-       private static final String NUMBER_OF_CHAINED_TASKS = "numOfChained";
-       private static final String CHAINED_IN_SERIALIZER = 
"chainedSerializer_";
-       private static final String CHAINED_INVOKABLE = "chainedInvokable_";
+       private static final String CHAINED_OUTPUTS = "chainedOutputs";
+       private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
+       private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
        private static final String OUTPUT_NAME = "outputName_";
        private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
        private static final String PARTITIONER_OBJECT = "partitionerObject_";
-       private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
+       private static final String VERTEX_NAME = "vertexName";
        private static final String ITERATION_ID = "iteration-id";
        private static final String OUTPUT_SELECTOR = "outputSelector";
        private static final String DIRECTED_EMIT = "directedEmit";
@@ -58,6 +61,8 @@ public class StreamConfig {
        private static final String TYPE_SERIALIZER_OUT_1 = 
"typeSerializer_out_1";
        private static final String TYPE_SERIALIZER_OUT_2 = 
"typeSerializer_out_2";
        private static final String ITERATON_WAIT = "iterationWait";
+       private static final String OUTPUTS = "outVertexNames";
+       private static final String RW_ORDER = "rwOrder";
 
        // DEFAULT VALUES
 
@@ -75,6 +80,14 @@ public class StreamConfig {
                return config;
        }
 
+       public void setVertexName(String vertexName) {
+               config.setString(VERTEX_NAME, vertexName);
+       }
+
+       public String getTaskName() {
+               return config.getString(VERTEX_NAME, "Missing");
+       }
+
        public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
                setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
        }
@@ -206,25 +219,21 @@ public class StreamConfig {
                return config.getLong(ITERATON_WAIT, 0);
        }
 
-       public void setNumberOfOutputChannels(int outputIndex, Integer 
numberOfOutputChannels) {
-               config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 
numberOfOutputChannels);
-       }
+       public <T> void setPartitioner(String output, StreamPartitioner<T> 
partitionerObject) {
 
-       public int getNumberOfOutputChannels(int outputIndex) {
-               return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + 
outputIndex, 0);
-       }
-
-       public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> 
partitionerObject) {
-
-               config.setBytes(PARTITIONER_OBJECT + outputIndex,
+               config.setBytes(PARTITIONER_OBJECT + output,
                                
SerializationUtils.serialize(partitionerObject));
        }
 
-       public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int 
outputIndex)
-                       throws ClassNotFoundException, IOException {
-               @SuppressWarnings("unchecked")
-               StreamPartitioner<T> partitioner = (StreamPartitioner<T>) 
InstantiationUtil
-                               .readObjectFromConfig(this.config, 
PARTITIONER_OBJECT + outputIndex, cl);
+       @SuppressWarnings("unchecked")
+       public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String 
output) {
+               StreamPartitioner<T> partitioner = null;
+               try {
+                       partitioner = (StreamPartitioner<T>) 
InstantiationUtil.readObjectFromConfig(
+                                       this.config, PARTITIONER_OBJECT + 
output, cl);
+               } catch (Exception e) {
+                       throw new RuntimeException("Partitioner could not be 
instantiated.");
+               }
                if (partitioner != null) {
                        return partitioner;
                } else {
@@ -232,27 +241,27 @@ public class StreamConfig {
                }
        }
 
-       public void setSelectAll(int outputIndex, Boolean selectAll) {
+       public void setSelectAll(String output, Boolean selectAll) {
                if (selectAll != null) {
-                       config.setBoolean(OUTPUT_SELECT_ALL + outputIndex, 
selectAll);
+                       config.setBoolean(OUTPUT_SELECT_ALL + output, 
selectAll);
                }
        }
 
-       public boolean isSelectAll(int outputIndex) {
-               return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, 
false);
+       public boolean isSelectAll(String output) {
+               return config.getBoolean(OUTPUT_SELECT_ALL + output, true);
        }
 
-       public void setOutputName(int outputIndex, List<String> outputName) {
+       public void setOutputNames(String output, List<String> outputName) {
                if (outputName != null) {
-                       config.setBytes(OUTPUT_NAME + outputIndex,
+                       config.setBytes(OUTPUT_NAME + output,
                                        
SerializationUtils.serialize((Serializable) outputName));
                }
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getOutputNames(int outputIndex) {
-               return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
-                               + outputIndex, null));
+       public List<String> getOutputNames(String output) {
+               return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
+                               null));
        }
 
        public void setNumberOfInputs(int numberOfInputs) {
@@ -271,6 +280,38 @@ public class StreamConfig {
                return config.getInteger(NUMBER_OF_OUTPUTS, 0);
        }
 
+       public void setOutputs(List<String> outputVertexNames) {
+               config.setBytes(OUTPUTS, 
SerializationUtils.serialize((Serializable) outputVertexNames));
+       }
+
+       @SuppressWarnings("unchecked")
+       public List<String> getOutputs(ClassLoader cl) {
+               try {
+                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not instantiate 
outputs.");
+               }
+       }
+
+       public void setRecordWriterOrder(List<Tuple2<String, String>> 
outEdgeList) {
+
+               List<String> outVertices = new ArrayList<String>();
+               for (Tuple2<String, String> edge : outEdgeList) {
+                       outVertices.add(edge.f1);
+               }
+
+               config.setBytes(RW_ORDER, 
SerializationUtils.serialize((Serializable) outVertices));
+       }
+
+       @SuppressWarnings("unchecked")
+       public List<String> getRecordWriterOrder(ClassLoader cl) {
+               try {
+                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config, RW_ORDER, cl);
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not instantiate 
outputs.");
+               }
+       }
+
        public void setInputIndex(int inputNumber, Integer inputTypeNumber) {
                config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
        }
@@ -293,40 +334,77 @@ public class StreamConfig {
                }
        }
 
-       public int getNumberofChainedTasks() {
-               return config.getInteger(NUMBER_OF_CHAINED_TASKS, 0);
+       public void setChainedOutputs(List<String> chainedOutputs) {
+               config.setBytes(CHAINED_OUTPUTS,
+                               SerializationUtils.serialize((Serializable) 
chainedOutputs));
        }
 
-       public void setNumberofChainedTasks(int n) {
-               config.setInteger(NUMBER_OF_CHAINED_TASKS, n);
-       }
-
-       public ChainableInvokable<?, ?> getChainedInvokable(int 
chainedTaskIndex, ClassLoader cl) {
+       @SuppressWarnings("unchecked")
+       public List<String> getChainedOutputs(ClassLoader cl) {
                try {
-                       return (ChainableInvokable<?, ?>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       CHAINED_INVOKABLE + chainedTaskIndex, 
cl);
+                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                                       CHAINED_OUTPUTS, cl);
                } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
invokable.");
+                       throw new RuntimeException("Could not instantiate 
chained outputs.");
                }
        }
 
-       public StreamRecordSerializer<?> getChainedInSerializer(int 
chainedTaskIndex, ClassLoader cl) {
+       public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> 
chainedTaskConfigs) {
+               config.setBytes(CHAINED_TASK_CONFIG,
+                               SerializationUtils.serialize((Serializable) 
chainedTaskConfigs));
+       }
+
+       @SuppressWarnings("unchecked")
+       public Map<String, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
                try {
-                       return (StreamRecordSerializer<?>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       CHAINED_IN_SERIALIZER + 
chainedTaskIndex, cl);
+
+                       return (Map<String, StreamConfig>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                                       CHAINED_TASK_CONFIG, cl);
                } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
serializer.");
+                       throw new RuntimeException("Could not instantiate 
configuration.");
                }
        }
 
-       public void setChainedSerializer(StreamRecordSerializer<?> typeWrapper, 
int chainedTaskIndex) {
-               config.setBytes(CHAINED_IN_SERIALIZER + chainedTaskIndex,
-                               SerializationUtils.serialize(typeWrapper));
+       public void setChainStart() {
+               config.setBoolean(IS_CHAINED_VERTEX, true);
        }
 
-       public void setChainedInvokable(ChainableInvokable<?, ?> invokable, int 
chainedTaskIndex) {
-               config.setBytes(CHAINED_INVOKABLE + chainedTaskIndex,
-                               SerializationUtils.serialize(invokable));
+       public boolean isChainStart() {
+               return config.getBoolean(IS_CHAINED_VERTEX, false);
        }
 
+       @Override
+       public String toString() {
+
+               ClassLoader cl = getClass().getClassLoader();
+
+               StringBuilder builder = new StringBuilder();
+               builder.append("\n=======================");
+               builder.append("Stream Config");
+               builder.append("=======================");
+               builder.append("\nTask name: " + getTaskName());
+               builder.append("\nNumber of non-chained inputs: " + 
getNumberOfInputs());
+               builder.append("\nNumber of non-chained outputs: " + 
getNumberOfOutputs());
+               builder.append("\nOutput names: " + getOutputs(cl));
+               builder.append("\nPartitioning:");
+               for (String outputname : getOutputs(cl)) {
+                       builder.append("\n\t" + outputname + ": "
+                                       + getPartitioner(cl, 
outputname).getClass().getSimpleName());
+               }
+
+               builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+
+               try {
+                       builder.append("\nInvokable: " + 
getUserInvokable(cl).getClass().getSimpleName());
+               } catch (Exception e) {
+                       builder.append("\nInvokable: Missing");
+               }
+               builder.append("\nBuffer timeout: " + getBufferTimeout());
+               if (isChainStart() && getChainedOutputs(cl).size() > 0) {
+                       builder.append("\n\n\n---------------------\nChained 
task configs\n---------------------\n");
+                       
builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+               }
+
+               return builder.toString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index b7e57e0..a95973b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -30,7 +30,7 @@ public class CollectorWrapper<OUT> implements Collector<OUT> {
                this.outputs = new LinkedList<Collector<OUT>>();
        }
 
-       public void addChainedOutput(Collector<OUT> output) {
+       public void addCollector(Collector<OUT> output) {
                outputs.add(output);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index 4c21564..6fd1b98 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -17,11 +17,13 @@
 
 package org.apache.flink.streaming.api.collector;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.util.Collector;
 
 public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<StreamRecord<OUT>>> {
@@ -52,6 +54,15 @@ public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<Stream
 
        @Override
        public void close() {
+               if (output instanceof StreamRecordWriter) {
+                       
((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
+               } else {
+                       try {
+                               output.flush();
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       }
+               }
        }
 
        public boolean isSelectAll() {
@@ -62,4 +73,8 @@ public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<Stream
                return selectedNames;
        }
 
+       public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
getRecordWriter() {
+               return output;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
index fa374b1..c3e4c9d 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.collector;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
@@ -80,11 +79,6 @@ public class StreamOutputWrapper<OUT> implements 
Collector<OUT> {
                outputs.add(output);
        }
 
-       protected void 
addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       List<String> outputNames, boolean isSelectAllOutput) {
-
-       }
-
        /**
         * Collects and emits a tuple/object to the outputs by reusing a
         * StreamRecord object.

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 60a7b14..99f826d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -19,13 +19,15 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamOutput;
@@ -45,83 +47,142 @@ public class OutputHandler<OUT> {
 
        private StreamVertex<?, OUT> vertex;
        private StreamConfig configuration;
-
-       private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
outputs;
+       private ClassLoader cl;
        private Collector<OUT> outerCollector;
 
-       TypeInformation<OUT> outTypeInfo = null;
-       StreamRecordSerializer<OUT> outSerializer = null;
-       SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = 
null;
-
        public List<ChainableInvokable<?, ?>> chainedInvokables;
 
-       private int numberOfChainedTasks;
+       private Map<String, StreamOutput<?>> outputMap;
+       private Map<String, StreamConfig> chainedConfigs;
+       private List<String> recordWriterOrder;
 
        public OutputHandler(StreamVertex<?, OUT> vertex) {
+
+               // Initialize some fields
                this.vertex = vertex;
-               this.outputs = new 
LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
                this.configuration = new 
StreamConfig(vertex.getTaskConfiguration());
                this.chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
-               this.numberOfChainedTasks = 
configuration.getNumberofChainedTasks();
+               this.outputMap = new HashMap<String, StreamOutput<?>>();
+               this.cl = vertex.getUserCodeClassLoader();
+
+               // We read the chained configs, and the order of record writer
+               // registrations by outputname
+               this.chainedConfigs = 
configuration.getTransitiveChainedTaskConfigs(cl);
+               this.recordWriterOrder = configuration.getRecordWriterOrder(cl);
+
+               // For the network outputs of the chain head we create the 
stream
+               // outputs
+               for (String outName : configuration.getOutputs(cl)) {
+                       StreamOutput<?> streamOutput = 
createStreamOutput(outName, configuration);
+                       outputMap.put(outName, streamOutput);
+               }
 
-               this.outerCollector = createChainedCollector(0);
+               // If we have chained tasks we iterate through them and create 
the
+               // stream outputs for the network outputs
+               if (chainedConfigs != null) {
+                       for (StreamConfig config : chainedConfigs.values()) {
+                               for (String outName : config.getOutputs(cl)) {
+                                       StreamOutput<?> streamOutput = 
createStreamOutput(outName, config);
+                                       outputMap.put(outName, streamOutput);
+                               }
+                       }
+               }
+
+               // We create the outer collector that will be passed to the 
first task
+               // in the chain
+               this.outerCollector = createChainedCollector(configuration);
 
        }
 
-       public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
getOutputs() {
-               return outputs;
+       public Collection<StreamOutput<?>> getOutputs() {
+               return outputMap.values();
        }
 
-       // We create the outer collector by nesting the chainable invokables 
into
-       // each other
+       /**
+        * This method builds up a nested collector which encapsulates all the
+        * chained operators and their network output. The result of this 
recursive
+        * call will be passed as collector to the first invokable in the chain.
+        * 
+        * @param chainedTaskConfig
+        *            The configuration of the starting operator of the chain, 
we
+        *            use this paramater to recursively build the whole chain
+        * @return Returns the collector for the chain starting from the given
+        *         config
+        */
        @SuppressWarnings({ "unchecked", "rawtypes" })
-       private Collector<OUT> createChainedCollector(int chainedTaskIndex) {
+       private Collector<OUT> createChainedCollector(StreamConfig 
chainedTaskConfig) {
 
-               if (numberOfChainedTasks == chainedTaskIndex) {
-                       // At the end of the chain we create the collector that 
sends data
-                       // to the recordwriters
-                       return createNetworkCollector();
-               } else {
+               // We create a wrapper that will encapsulate the chained 
operators and
+               // network outputs
+               CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>();
 
-                       ChainableInvokable chainableInvokable = 
configuration.getChainedInvokable(
-                                       chainedTaskIndex, 
vertex.getUserCodeClassLoader());
+               // If the task has network outputs we create a collector for 
those and
+               // pass
+               // it to the wrapper
+               if (chainedTaskConfig.getNumberOfOutputs() > 0) {
+                       wrapper.addCollector((Collector<OUT>) 
createNetworkCollector(chainedTaskConfig));
+               }
 
-                       // The nesting is done by calling this method 
recursively when
-                       // passing the collector to the invokable
-                       chainableInvokable.setup(
-                                       createChainedCollector(chainedTaskIndex 
+ 1),
-                                       
configuration.getChainedInSerializer(chainedTaskIndex,
-                                                       
vertex.getUserCodeClassLoader()));
+               // If the task has chained outputs we create a chained 
collector for
+               // each of them and pass it to the wrapper
+               for (String output : chainedTaskConfig.getChainedOutputs(cl)) {
+                       
wrapper.addCollector(createChainedCollector(chainedConfigs.get(output)));
+               }
 
-                       // We hold a list of the chained invokables for 
initializaton
-                       // afterwards
-                       chainedInvokables.add(chainableInvokable);
+               if (chainedTaskConfig.isChainStart()) {
+                       // The current task is the first chained task at this 
vertex so we
+                       // return the wrapper
+                       return wrapper;
+               } else {
+                       // The current task is a part of the chain so we get 
the chainable
+                       // invokable which will be returned and set it up using 
the wrapper
+                       ChainableInvokable chainableInvokable = 
chainedTaskConfig.getUserInvokable(vertex
+                                       .getUserCodeClassLoader());
+                       chainableInvokable.setup(wrapper,
+                                       
chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
 
+                       chainedInvokables.add(chainableInvokable);
                        return chainableInvokable;
                }
 
        }
 
-       private Collector<OUT> createNetworkCollector() {
+       /**
+        * We create the collector for the network outputs of the task 
represented
+        * by the config using the StreamOutputs that we have set up in the
+        * constructor.
+        * 
+        * @param config
+        *            The config of the task
+        * @return We return a collector that represents all the network 
outputs of
+        *         this task
+        */
+       @SuppressWarnings("unchecked")
+       private <T> Collector<T> createNetworkCollector(StreamConfig config) {
+
+               StreamRecordSerializer<T> outSerializer = config
+                               .getTypeSerializerOut1(vertex.userClassLoader);
+               SerializationDelegate<StreamRecord<T>> outSerializationDelegate 
= null;
 
-               createOutSerializer();
+               if (outSerializer != null) {
+                       outSerializationDelegate = new 
SerializationDelegate<StreamRecord<T>>(outSerializer);
+                       
outSerializationDelegate.setInstance(outSerializer.createInstance());
+               }
 
-               StreamOutputWrapper<OUT> collector;
+               StreamOutputWrapper<T> collector;
 
                if (vertex.configuration.isDirectedEmit()) {
-                       OutputSelector<OUT> outputSelector = 
vertex.configuration
+                       OutputSelector<T> outputSelector = vertex.configuration
                                        
.getOutputSelector(vertex.userClassLoader);
 
-                       collector = new 
DirectedOutputWrapper<OUT>(vertex.getInstanceID(),
+                       collector = new 
DirectedOutputWrapper<T>(vertex.getInstanceID(),
                                        outSerializationDelegate, 
outputSelector);
                } else {
-                       collector = new 
StreamOutputWrapper<OUT>(vertex.getInstanceID(),
-                                       outSerializationDelegate);
+                       collector = new 
StreamOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate);
                }
 
-               int numberOfOutputs = configuration.getNumberOfOutputs();
-               for (int i = 0; i < numberOfOutputs; i++) {
-                       collector = (StreamOutputWrapper<OUT>) 
addStreamOutput(i, collector);
+               for (String output : config.getOutputs(cl)) {
+                       collector.addOutput((StreamOutput<T>) 
outputMap.get(output));
                }
 
                return collector;
@@ -131,32 +192,35 @@ public class OutputHandler<OUT> {
                return outerCollector;
        }
 
-       void createOutSerializer() {
-               outSerializer = 
configuration.getTypeSerializerOut1(vertex.userClassLoader);
-               if (outSerializer != null) {
-                       outSerializationDelegate = new 
SerializationDelegate<StreamRecord<OUT>>(outSerializer);
-                       
outSerializationDelegate.setInstance(outSerializer.createInstance());
-               }
-       }
+       /**
+        * We create the StreamOutput for the specific output given by the 
name, and
+        * the configuration of its source task
+        * 
+        * @param name
+        *            Name of the output to which the streamoutput will be set 
up
+        * @param configuration
+        *            The config of upStream task
+        * @return
+        */
+       private <T> StreamOutput<T> createStreamOutput(String name, 
StreamConfig configuration) {
 
-       Collector<OUT> addStreamOutput(int outputNumber, 
StreamOutputWrapper<OUT> networkCollector) {
+               int outputNumber = recordWriterOrder.indexOf(name);
 
-               StreamPartitioner<OUT> outputPartitioner;
+               StreamPartitioner<T> outputPartitioner;
 
                try {
-                       outputPartitioner = 
configuration.getPartitioner(vertex.userClassLoader,
-                                       outputNumber);
+                       outputPartitioner = 
configuration.getPartitioner(vertex.userClassLoader, name);
                } catch (Exception e) {
                        throw new StreamVertexException("Cannot deserialize 
partitioner for "
-                                       + vertex.getName() + " with " + 
outputNumber + " outputs", e);
+                                       + vertex.getName() + " with " + name + 
" outputs", e);
                }
 
-               RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+               RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
 
                long bufferTimeout = configuration.getBufferTimeout();
 
                if (bufferTimeout >= 0) {
-                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
+                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
                                        
.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
 
                        if (LOG.isTraceEnabled()) {
@@ -164,7 +228,7 @@ public class OutputHandler<OUT> {
                                                bufferTimeout, 
vertex.getClass().getSimpleName());
                        }
                } else {
-                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
+                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
                                        
.getEnvironment().getWriter(outputNumber), outputPartitioner);
 
                        if (LOG.isTraceEnabled()) {
@@ -172,34 +236,28 @@ public class OutputHandler<OUT> {
                        }
                }
 
-               outputs.add(output);
-
-               networkCollector.addOutput(new StreamOutput<OUT>(output, 
configuration
-                               .isSelectAll(outputNumber) ? null : 
configuration.getOutputNames(outputNumber)));
+               StreamOutput<T> streamOutput = new StreamOutput<T>(output,
+                               configuration.isSelectAll(name) ? null : 
configuration.getOutputNames(name));
 
                if (LOG.isTraceEnabled()) {
                        LOG.trace("Partitioner set: {} with {} outputs for {}", 
outputPartitioner.getClass()
                                        .getSimpleName(), outputNumber, 
vertex.getClass().getSimpleName());
                }
 
-               return networkCollector;
+               return streamOutput;
        }
 
        public void flushOutputs() throws IOException, InterruptedException {
-               for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
output : outputs) {
-                       if (output instanceof StreamRecordWriter) {
-                               
((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
-                       } else {
-                               output.flush();
-                       }
+               for (StreamOutput<?> streamOutput : getOutputs()) {
+                       streamOutput.close();
                }
        }
 
        public void invokeUserFunction(String componentTypeName, 
StreamInvokable<?, OUT> userInvokable)
                        throws IOException, InterruptedException {
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("{} {} invoked with instance id {}", 
componentTypeName,
-                                       vertex.getName(), 
vertex.getInstanceID());
+                       LOG.debug("{} {} invoked with instance id {}", 
componentTypeName, vertex.getName(),
+                                       vertex.getInstanceID());
                }
 
                try {
@@ -210,8 +268,8 @@ public class OutputHandler<OUT> {
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("{} {} invoke finished instance id {}", 
componentTypeName,
-                                       vertex.getName(), 
vertex.getInstanceID());
+                       LOG.debug("{} {} invoke finished instance id {}", 
componentTypeName, vertex.getName(),
+                                       vertex.getInstanceID());
                }
 
                flushOutputs();

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index 43b455e..cba23b8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -17,19 +17,22 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StreamIterationHead<OUT extends Tuple> extends 
StreamVertex<OUT,OUT> {
+public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, 
OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
 
@@ -72,6 +75,15 @@ public class StreamIterationHead<OUT extends Tuple> extends 
StreamVertex<OUT,OUT
                }
 
                StreamRecord<OUT> nextRecord;
+               StreamRecordSerializer<OUT> serializer = configuration
+                               .getTypeSerializerOut1(userClassLoader);
+               SerializationDelegate<StreamRecord<OUT>> serializationDelegate 
= new SerializationDelegate<StreamRecord<OUT>>(
+                               serializer);
+
+               List<StreamOutput<OUT>> outputs = new 
LinkedList<StreamOutput<OUT>>();
+               for (StreamOutput<?> output : outputHandler.getOutputs()) {
+                       outputs.add((StreamOutput<OUT>) output);
+               }
 
                while (true) {
                        if (shouldWait) {
@@ -82,10 +94,9 @@ public class StreamIterationHead<OUT extends Tuple> extends 
StreamVertex<OUT,OUT
                        if (nextRecord == null) {
                                break;
                        }
-                       for 
(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
-                                       .getOutputs()) {
-                               
outputHandler.outSerializationDelegate.setInstance(nextRecord);
-                               
output.emit(outputHandler.outSerializationDelegate);
+                       for (StreamOutput<OUT> output : outputs) {
+                               serializationDelegate.setInstance(nextRecord);
+                               output.collect(serializationDelegate);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 53b75a0..698b193 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -46,6 +46,7 @@ import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -78,6 +79,15 @@ class DataStream[T](javaStream: JavaStream[T]) {
         " "  +
         "parallelism.")
   }
+  
+  def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => 
ds.setChainingStrategy(strategy)
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for 
operators.")
+    }
+    this
+  }
 
   /**
    * Creates a new DataStream by merging DataStream outputs of

Reply via email to