[FLINK-1345] [streaming] Chaining refactor + ChainingStrategy exposed through 
the API for operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e30c6f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e30c6f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e30c6f7

Branch: refs/heads/master
Commit: 3e30c6f73a12e2f50449a4d0dce452031f9a7317
Parents: 26535c4
Author: Gyula Fora <[email protected]>
Authored: Fri Jan 16 15:11:38 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:34 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 103 ++++++++---------
 .../flink/streaming/api/StreamConfig.java       |  21 +---
 .../api/datastream/ConnectedDataStream.java     |   2 +-
 .../streaming/api/datastream/DataStream.java    |  13 ++-
 .../api/datastream/DataStreamSink.java          |   5 +-
 .../api/datastream/DataStreamSource.java        |   5 +-
 .../api/datastream/IterativeDataStream.java     |   3 +-
 .../datastream/SingleOutputStreamOperator.java  |  12 +-
 .../environment/StreamExecutionEnvironment.java |   9 +-
 .../api/invokable/ChainableInvokable.java       |   1 +
 .../api/invokable/StreamInvokable.java          |  20 +++-
 .../operator/GroupedReduceInvokable.java        |   6 +-
 .../operator/GroupedWindowInvokable.java        |   1 +
 .../api/invokable/operator/WindowInvokable.java |   3 +-
 .../api/streamvertex/CoStreamVertex.java        |   2 +-
 .../api/streamvertex/OutputHandler.java         | 110 +++++++++----------
 16 files changed, 159 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 35e145e..0020d48 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
@@ -64,7 +65,7 @@ public class JobGraphBuilder {
        private Map<String, Integer> vertexParallelism;
        private Map<String, Long> bufferTimeout;
        private Map<String, List<String>> outEdgeList;
-       private Map<String, List<Integer>> outEdgeType;
+       private Map<String, List<Integer>> outEdgeIndex;
        private Map<String, List<List<String>>> outEdgeNames;
        private Map<String, List<Boolean>> outEdgeSelectAll;
        private Map<String, List<String>> inEdgeList;
@@ -85,8 +86,7 @@ public class JobGraphBuilder {
        private Map<String, Map<String, OperatorState<?>>> operatorStates;
        private Map<String, InputFormat<String, ?>> inputFormatList;
        private Map<String, List<String>> chainedVertices;
-       private Map<String, List<ChainableInvokable<?, ?>>> chainedInvokable;
-       private Map<String, List<StreamRecordSerializer<?>>> chainedSerializer;
+       private Map<String, String> lastInChains;
 
        private Set<String> sources;
        private Set<String> builtVertices;
@@ -97,11 +97,19 @@ public class JobGraphBuilder {
         */
        public JobGraphBuilder() {
 
+               initGraph();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("JobGraph created");
+               }
+       }
+
+       public void initGraph() {
                streamVertices = new HashMap<String, AbstractJobVertex>();
                vertexParallelism = new HashMap<String, Integer>();
                bufferTimeout = new HashMap<String, Long>();
                outEdgeList = new HashMap<String, List<String>>();
-               outEdgeType = new HashMap<String, List<Integer>>();
+               outEdgeIndex = new HashMap<String, List<Integer>>();
                outEdgeNames = new HashMap<String, List<List<String>>>();
                outEdgeSelectAll = new HashMap<String, List<Boolean>>();
                inEdgeList = new HashMap<String, List<String>>();
@@ -121,16 +129,11 @@ public class JobGraphBuilder {
                iterationWaitTime = new HashMap<String, Long>();
                operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
                inputFormatList = new HashMap<String, InputFormat<String, ?>>();
-               chainedInvokable = new HashMap<String, 
List<ChainableInvokable<?, ?>>>();
-               chainedSerializer = new HashMap<String, 
List<StreamRecordSerializer<?>>>();
                chainedVertices = new HashMap<String, List<String>>();
+               lastInChains = new HashMap<String, String>();
 
                sources = new HashSet<String>();
                builtVertices = new HashSet<String>();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("JobGraph created");
-               }
        }
 
        /**
@@ -198,7 +201,7 @@ public class JobGraphBuilder {
                iterationIds.put(vertexName, iterationID);
                iterationIDtoHeadName.put(iterationID, vertexName);
 
-               setBytesFrom(iterationHead, vertexName);
+               setSerializersFrom(iterationHead, vertexName);
 
                setEdge(vertexName, iterationHead, outPartitioning
                                
.get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(),
@@ -241,7 +244,7 @@ public class JobGraphBuilder {
                iterationIds.put(vertexName, iterationID);
                iterationIDtoTailName.put(iterationID, vertexName);
 
-               setBytesFrom(iterationTail, vertexName);
+               setSerializersFrom(iterationTail, vertexName);
                iterationWaitTime.put(iterationIDtoTailName.get(iterationID), 
waitTime);
 
                if (LOG.isDebugEnabled()) {
@@ -288,12 +291,13 @@ public class JobGraphBuilder {
                invokableObjects.put(vertexName, invokableObject);
                operatorNames.put(vertexName, operatorName);
                outEdgeList.put(vertexName, new ArrayList<String>());
-               outEdgeType.put(vertexName, new ArrayList<Integer>());
+               outEdgeIndex.put(vertexName, new ArrayList<Integer>());
                outEdgeNames.put(vertexName, new ArrayList<List<String>>());
                outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>());
                inEdgeList.put(vertexName, new ArrayList<String>());
                outPartitioning.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
                iterationTailCount.put(vertexName, 0);
+               lastInChains.put(vertexName, vertexName);
        }
 
        private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
@@ -327,11 +331,16 @@ public class JobGraphBuilder {
                                }
                        }
 
+                       List<String> chainedNames = 
chainedVertices.get(vertexName);
+                       boolean isChained = chainedNames != null;
+                       int numChained = isChained ? chainedNames.size() : 0;
+                       String lastInChain = lastInChains.get(vertexName);
+
                        // Get vertex attributes
                        Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
                        StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
                        int parallelism = vertexParallelism.get(vertexName);
-                       byte[] outputSelector = outputSelectors.get(vertexName);
+                       byte[] outputSelector = 
outputSelectors.get(lastInChain);
                        Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
 
                        // Create vertex object
@@ -347,16 +356,17 @@ public class JobGraphBuilder {
                                LOG.debug("Parallelism set: {} for {}", 
parallelism, vertexName);
                        }
 
+                       // Set vertex config
+
                        StreamConfig config = new 
StreamConfig(vertex.getConfiguration());
 
-                       config.setBufferTimeout(bufferTimeout.get(vertexName));
+                       config.setBufferTimeout(bufferTimeout.get(lastInChain));
 
                        
config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
                        
config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
                        
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
                        
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
 
-                       // Set vertex config
                        config.setUserInvokable(invokableObject);
                        config.setOutputSelector(outputSelector);
                        config.setOperatorStates(state);
@@ -371,15 +381,12 @@ public class JobGraphBuilder {
                                
vertex.setInputSplitSource(inputFormatList.get(vertexName));
                        }
 
-                       List<ChainableInvokable<?, ?>> chainedInvokables = 
chainedInvokable.get(vertexName);
-                       List<StreamRecordSerializer<?>> chainedSerializers = 
chainedSerializer.get(vertexName);
-
-                       int numChained = chainedInvokables == null ? 0 : 
chainedInvokables.size();
                        config.setNumberofChainedTasks(numChained);
 
                        for (int i = 0; i < numChained; i++) {
-                               
config.setChainedInvokable(chainedInvokables.get(i), i);
-                               
config.setChainedSerializer(chainedSerializers.get(i), i);
+                               config.setChainedInvokable(
+                                               (ChainableInvokable<?, ?>) 
invokableObjects.get(chainedNames.get(i)), i);
+                               
config.setChainedSerializer(typeSerializersIn1.get(chainedNames.get(i)), i);
                        }
 
                        streamVertices.put(vertexName, vertex);
@@ -390,12 +397,15 @@ public class JobGraphBuilder {
        }
 
        private void chainRecursively(String chainStart, String current, String 
next) {
+               // We chain the next operator to the start of this chain
                chainTasks(chainStart, next);
-               // Add multiple chaining here
+               // Now recursively chain the outputs of next (depth first)
                for (String output : outEdgeList.get(next)) {
                        if (isChainable(next, output)) {
+                               // Recursive call
                                chainRecursively(chainStart, next, output);
                        } else {
+                               // If not chainable we continue building the 
jobgraph from there
                                createVertex(output);
                        }
                }
@@ -405,25 +415,14 @@ public class JobGraphBuilder {
                return outEdgeList.get(vertexName).size() == 1
                                && inEdgeList.get(outName).size() == 1
                                && outputSelectors.get(vertexName) == null
-                               && invokableObjects.get(outName).isChainable()
+                               && 
invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS
+                               && 
(invokableObjects.get(vertexName).getChainingStrategy() == 
ChainingStrategy.HEAD || invokableObjects
+                                               
.get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS)
                                && 
outPartitioning.get(vertexName).get(0).getStrategy() == 
PartitioningStrategy.FORWARD
                                && vertexParallelism.get(vertexName) == 
vertexParallelism.get(outName) && chaining;
        }
 
        private void chainTasks(String first, String second) {
-               List<ChainableInvokable<?, ?>> chainedInvokables = 
chainedInvokable.get(first);
-               if (chainedInvokables == null) {
-                       chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
-               }
-               chainedInvokables.add((ChainableInvokable<?, ?>) 
invokableObjects.get(second));
-               chainedInvokable.put(first, chainedInvokables);
-
-               List<StreamRecordSerializer<?>> chainedSerializers = 
chainedSerializer.get(first);
-               if (chainedSerializers == null) {
-                       chainedSerializers = new 
ArrayList<StreamRecordSerializer<?>>();
-               }
-               chainedSerializers.add(typeSerializersIn1.get(second));
-               chainedSerializer.put(first, chainedSerializers);
 
                List<String> chained = chainedVertices.get(first);
                if (chained == null) {
@@ -431,16 +430,7 @@ public class JobGraphBuilder {
                }
                chained.add(second);
                chainedVertices.put(first, chained);
-
-               outEdgeList.put(first, outEdgeList.get(second));
-               typeSerializersOut1.put(first, typeSerializersOut1.get(second));
-               outPartitioning.put(first, outPartitioning.get(second));
-               outEdgeType.put(first, outEdgeType.get(second));
-               outEdgeNames.put(first, outEdgeNames.get(second));
-               outEdgeSelectAll.put(first, outEdgeSelectAll.get(second));
-               outPartitioning.put(first, outPartitioning.get(second));
-               bufferTimeout.put(first, bufferTimeout.get(second));
-               outputSelectors.put(first, outputSelectors.get(second));
+               lastInChains.put(first, second);
 
        }
 
@@ -477,8 +467,10 @@ public class JobGraphBuilder {
 
                int outputIndex = 
upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1;
 
-               config.setOutputName(outputIndex, 
outEdgeNames.get(upStreamVertexName).get(outputIndex));
-               config.setSelectAll(outputIndex, 
outEdgeSelectAll.get(upStreamVertexName).get(outputIndex));
+               config.setOutputName(outputIndex, 
outEdgeNames.get(lastInChains.get(upStreamVertexName))
+                               .get(outputIndex));
+               config.setSelectAll(outputIndex, 
outEdgeSelectAll.get(lastInChains.get(upStreamVertexName))
+                               .get(outputIndex));
                config.setPartitioner(outputIndex, partitionerObject);
                config.setNumberOfOutputChannels(outputIndex, 
vertexParallelism.get(downStreamVertexName));
        }
@@ -547,7 +539,7 @@ public class JobGraphBuilder {
                        StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames,
                        boolean selectAll) {
                outEdgeList.get(upStreamVertexName).add(downStreamVertexName);
-               outEdgeType.get(upStreamVertexName).add(typeNumber);
+               outEdgeIndex.get(upStreamVertexName).add(typeNumber);
                inEdgeList.get(downStreamVertexName).add(upStreamVertexName);
                outPartitioning.get(upStreamVertexName).add(partitionerObject);
                outEdgeNames.get(upStreamVertexName).add(outputNames);
@@ -608,7 +600,7 @@ public class JobGraphBuilder {
         * @param to
         *            to
         */
-       public void setBytesFrom(String from, String to) {
+       public void setSerializersFrom(String from, String to) {
                operatorNames.put(to, operatorNames.get(from));
 
                typeSerializersIn1.put(to, typeSerializersOut1.get(from));
@@ -691,19 +683,20 @@ public class JobGraphBuilder {
                for (String upStreamVertexName : builtVertices) {
                        int i = 0;
 
-                       List<Integer> outEdgeTypeList = 
outEdgeType.get(upStreamVertexName);
+                       List<Integer> outEdgeTypeList = 
outEdgeIndex.get(lastInChains.get(upStreamVertexName));
 
-                       for (String downStreamVertexName : 
outEdgeList.get(upStreamVertexName)) {
+                       for (String downStreamVertexName : outEdgeList
+                                       
.get(lastInChains.get(upStreamVertexName))) {
                                StreamConfig downStreamVertexConfig = new 
StreamConfig(streamVertices.get(
                                                
downStreamVertexName).getConfiguration());
 
                                int inputNumber = 
downStreamVertexConfig.getNumberOfInputs();
 
-                               
downStreamVertexConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
+                               
downStreamVertexConfig.setInputIndex(inputNumber++, outEdgeTypeList.get(i));
                                
downStreamVertexConfig.setNumberOfInputs(inputNumber);
 
                                connect(upStreamVertexName, 
downStreamVertexName,
-                                               
outPartitioning.get(upStreamVertexName).get(i));
+                                               
outPartitioning.get(lastInChains.get(upStreamVertexName)).get(i));
                                i++;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 22bcf92..ada3aae 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -24,7 +24,6 @@ import java.util.Map;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
@@ -170,7 +169,7 @@ public class StreamConfig {
                config.setBoolean(DIRECTED_EMIT, directedEmit);
        }
 
-       public boolean getDirectedEmit() {
+       public boolean isDirectedEmit() {
                return config.getBoolean(DIRECTED_EMIT, false);
        }
 
@@ -239,7 +238,7 @@ public class StreamConfig {
                }
        }
 
-       public boolean getSelectAll(int outputIndex) {
+       public boolean isSelectAll(int outputIndex) {
                return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, 
false);
        }
 
@@ -272,26 +271,14 @@ public class StreamConfig {
                return config.getInteger(NUMBER_OF_OUTPUTS, 0);
        }
 
-       public void setInputType(int inputNumber, Integer inputTypeNumber) {
+       public void setInputIndex(int inputNumber, Integer inputTypeNumber) {
                config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
        }
 
-       public int getInputType(int inputNumber) {
+       public int getInputIndex(int inputNumber) {
                return config.getInteger(INPUT_TYPE + inputNumber, 0);
        }
 
-       public void setFunctionClass(Class<? extends AbstractRichFunction> 
functionClass) {
-               config.setClass("functionClass", functionClass);
-       }
-
-       public Class<? extends AbstractRichFunction> 
getFunctionClass(ClassLoader cl) {
-               try {
-                       return config.getClass("functionClass", null, cl);
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load function 
class", e);
-               }
-       }
-
        public void setOperatorStates(Map<String, OperatorState<?>> states) {
                config.setBytes(OPERATOR_STATES, 
SerializationUtils.serialize((Serializable) states));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index a0c8ff8..80ea970 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -400,7 +400,7 @@ public class ConnectedDataStream<IN1, IN2> {
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<OUT, ?> returnStream = new 
SingleOutputStreamOperator(
-                               environment, functionName, outTypeInfo);
+                               environment, functionName, outTypeInfo, 
functionInvokable);
 
                dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), 
functionInvokable,
                                getInputType1(), getInputType2(), outTypeInfo, 
functionName,

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index db644e9..f68ab68 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1083,7 +1083,7 @@ public class DataStream<OUT> {
        protected <R> DataStream<OUT> addIterationSource(Integer iterationID, 
long waitTime) {
 
                DataStream<R> returnStream = new 
DataStreamSource<R>(environment, "iterationSource", null,
-                               true);
+                               null, true);
 
                jobGraphBuilder.addIterationHead(returnStream.getId(), 
this.getId(), iterationID,
                                degreeOfParallelism, waitTime);
@@ -1110,7 +1110,7 @@ public class DataStream<OUT> {
                DataStream<OUT> inputStream = this.copy();
                @SuppressWarnings({ "unchecked", "rawtypes" })
                SingleOutputStreamOperator<R, ?> returnStream = new 
SingleOutputStreamOperator(environment,
-                               operatorName, outTypeInfo);
+                               operatorName, outTypeInfo, invokable);
 
                jobGraphBuilder.addStreamVertex(returnStream.getId(), 
invokable, getType(), outTypeInfo,
                                operatorName, degreeOfParallelism);
@@ -1174,10 +1174,13 @@ public class DataStream<OUT> {
         */
        public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
 
-               DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType());
+               StreamInvokable<OUT, OUT> sinkInvokable = new 
SinkInvokable<OUT>(clean(sinkFunction));
 
-               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SinkInvokable<OUT>(
-                               clean(sinkFunction)), getType(), null, "sink", 
degreeOfParallelism);
+               DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType(),
+                               sinkInvokable);
+
+               jobGraphBuilder.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null,
+                               "sink", degreeOfParallelism);
 
                this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 61fc557..f064332 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 /**
  * Represents the end of a DataStream.
@@ -29,8 +30,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, 
DataStreamSink<IN>> {
 
        protected DataStreamSink(StreamExecutionEnvironment environment, String 
operatorType,
-                       TypeInformation<IN> outTypeInfo) {
-               super(environment, operatorType, outTypeInfo);
+                       TypeInformation<IN> outTypeInfo, StreamInvokable<?,?> 
invokable) {
+               super(environment, operatorType, outTypeInfo, invokable);
        }
 
        protected DataStreamSink(DataStream<IN> dataStream) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index a649f59..b596cbd 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 /**
  * The DataStreamSource represents the starting point of a DataStream.
@@ -31,8 +32,8 @@ public class DataStreamSource<OUT> extends 
SingleOutputStreamOperator<OUT, DataS
        boolean isParallel;
 
        public DataStreamSource(StreamExecutionEnvironment environment, String 
operatorType,
-                       TypeInformation<OUT> outTypeInfo, boolean isParallel) {
-               super(environment, operatorType, outTypeInfo);
+                       TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> 
invokable, boolean isParallel) {
+               super(environment, operatorType, outTypeInfo, invokable);
                this.isParallel = isParallel;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 88feb5d..c4cd1e1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -61,7 +61,8 @@ public class IterativeDataStream<IN> extends
         * 
         */
        public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
-               DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "iterationSink", null);
+               DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "iterationSink", null,
+                               null);
 
                jobGraphBuilder.addIterationTail(iterationSink.getId(), 
iterationTail.getId(), iterationID,
                                iterationTail.getParallelism(), waitTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 7a0abe4..b8ca42c 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.state.OperatorState;
 
@@ -42,12 +44,14 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                DataStream<OUT> {
 
        protected boolean isSplit;
+       protected StreamInvokable<?, ?> invokable;
 
        protected SingleOutputStreamOperator(StreamExecutionEnvironment 
environment,
-                       String operatorType, TypeInformation<OUT> outTypeInfo) {
+                       String operatorType, TypeInformation<OUT> outTypeInfo, 
StreamInvokable<?, ?> invokable) {
                super(environment, operatorType, outTypeInfo);
                setBufferTimeout(environment.getBufferTimeout());
                this.isSplit = false;
+               this.invokable = invokable;
        }
 
        @SuppressWarnings("unchecked")
@@ -55,6 +59,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                super(dataStream);
                if (dataStream instanceof SingleOutputStreamOperator) {
                        this.isSplit = ((SingleOutputStreamOperator<OUT, ?>) 
dataStream).isSplit;
+                       this.invokable = ((SingleOutputStreamOperator<OUT, ?>) 
dataStream).invokable;
                }
        }
 
@@ -192,4 +197,9 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                return new SingleOutputStreamOperator<OUT, O>(this);
        }
 
+       public SingleOutputStreamOperator<OUT, O> 
setChainingStrategy(ChainingStrategy strategy) {
+               this.invokable.setChainingStrategy(strategy);
+               return this;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 08fd19b..4194864 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction
 import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -431,11 +432,13 @@ public abstract class StreamExecutionEnvironment {
                boolean isParallel = function instanceof ParallelSourceFunction;
                int dop = isParallel ? getDegreeOfParallelism() : 1;
 
+               StreamInvokable<OUT, OUT> sourceInvokable = new 
SourceInvokable<OUT>(function);
+
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, sourceName,
-                               outTypeInfo, isParallel);
+                               outTypeInfo, sourceInvokable, isParallel);
 
-               jobGraphBuilder.addSourceVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
-                               null, outTypeInfo, sourceName, dop);
+               jobGraphBuilder.addSourceVertex(returnStream.getId(), 
sourceInvokable, null, outTypeInfo,
+                               sourceName, dop);
 
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
index 373b4e8..24c0319 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -28,6 +28,7 @@ public abstract class ChainableInvokable<IN, OUT> extends 
StreamInvokable<IN, OU
 
        public ChainableInvokable(Function userFunction) {
                super(userFunction);
+               setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
        public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> 
inSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 25e9221..614b67f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -59,6 +59,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        protected Function userFunction;
        protected volatile boolean isRunning;
 
+       private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+
        public StreamInvokable(Function userFunction) {
                this.userFunction = userFunction;
        }
@@ -160,7 +162,21 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
                return objectSerializer.copy(record);
        }
 
-       public boolean isChainable() {
-               return this instanceof ChainableInvokable;
+       public void setChainingStrategy(ChainingStrategy strategy) {
+               if (strategy == ChainingStrategy.ALWAYS) {
+                       if (!(this instanceof ChainableInvokable)) {
+                               throw new RuntimeException(
+                                               "Invokable needs to extend 
ChainableInvokable to be chained");
+                       }
+               }
+               this.chainingStrategy = strategy;
+       }
+
+       public ChainingStrategy getChainingStrategy() {
+               return chainingStrategy;
+       }
+
+       public static enum ChainingStrategy {
+               ALWAYS, NEVER, HEAD;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index 84258d6..01c0545 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -34,6 +34,7 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
                super(reducer);
                this.keySelector = keySelector;
                values = new HashMap<Object, IN>();
+               setChainingStrategy(ChainingStrategy.NEVER);
        }
 
        @Override
@@ -56,9 +57,4 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
                reduced = reducer.reduce(currentValue, nextValue);
        }
 
-       @Override
-       public boolean isChainable() {
-               return false;
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 33348e4..bbd7b0c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -154,6 +154,7 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                        LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) 
{
 
                super(userFunction);
+               setChainingStrategy(ChainingStrategy.NEVER);
 
                this.keySelector = keySelector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index ea891c9..de9c664 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -61,7 +61,8 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
        public WindowInvokable(Function userFunction, 
LinkedList<TriggerPolicy<IN>> triggerPolicies,
                        LinkedList<EvictionPolicy<IN>> evictionPolicies) {
                super(userFunction);
-
+               setChainingStrategy(ChainingStrategy.NEVER);
+               
                this.triggerPolicies = triggerPolicies;
                this.evictionPolicies = evictionPolicies;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index f065d9c..2b650be 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -80,7 +80,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
                ArrayList<BufferReader> inputList2 = new 
ArrayList<BufferReader>();
 
                for (int i = 0; i < numberOfInputs; i++) {
-                       int inputType = configuration.getInputType(i);
+                       int inputType = configuration.getInputIndex(i);
                        BufferReader reader = getEnvironment().getReader(i);
                        switch (inputType) {
                                case 1:

http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 457f3f8..60a7b14 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamOutput;
@@ -44,11 +43,11 @@ import org.slf4j.LoggerFactory;
 public class OutputHandler<OUT> {
        private static final Logger LOG = 
LoggerFactory.getLogger(OutputHandler.class);
 
-       private StreamVertex<?, OUT> streamVertex;
+       private StreamVertex<?, OUT> vertex;
        private StreamConfig configuration;
 
        private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
outputs;
-       private Collector<OUT> endCollector;
+       private Collector<OUT> outerCollector;
 
        TypeInformation<OUT> outTypeInfo = null;
        StreamRecordSerializer<OUT> outSerializer = null;
@@ -58,18 +57,14 @@ public class OutputHandler<OUT> {
 
        private int numberOfChainedTasks;
 
-       public OutputHandler(StreamVertex<?, OUT> streamComponent) {
-               this.streamVertex = streamComponent;
+       public OutputHandler(StreamVertex<?, OUT> vertex) {
+               this.vertex = vertex;
                this.outputs = new 
LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-               this.configuration = new 
StreamConfig(streamComponent.getTaskConfiguration());
+               this.configuration = new 
StreamConfig(vertex.getTaskConfiguration());
                this.chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
+               this.numberOfChainedTasks = 
configuration.getNumberofChainedTasks();
 
-               try {
-                       setConfigOutputs();
-               } catch (StreamVertexException e) {
-                       throw new StreamVertexException("Cannot register 
outputs for "
-                                       + 
streamComponent.getClass().getSimpleName(), e);
-               }
+               this.outerCollector = createChainedCollector(0);
 
        }
 
@@ -77,84 +72,83 @@ public class OutputHandler<OUT> {
                return outputs;
        }
 
-       private void setConfigOutputs() {
-               numberOfChainedTasks = configuration.getNumberofChainedTasks();
-               endCollector = createChainedOutputs(0);
-       }
-
-       @SuppressWarnings("unchecked")
-       private Collector<OUT> createChainedOutputs(int chainedTaskIndex) {
+       // We create the outer collector by nesting the chainable invokables 
into
+       // each other
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private Collector<OUT> createChainedCollector(int chainedTaskIndex) {
 
                if (numberOfChainedTasks == chainedTaskIndex) {
-                       return createEndCollector();
+                       // At the end of the chain we create the collector that 
sends data
+                       // to the recordwriters
+                       return createNetworkCollector();
                } else {
-                       CollectorWrapper<OUT> chainedCollector = new 
CollectorWrapper<OUT>();
 
-                       @SuppressWarnings("rawtypes")
                        ChainableInvokable chainableInvokable = 
configuration.getChainedInvokable(
-                                       chainedTaskIndex, 
streamVertex.getUserCodeClassLoader());
+                                       chainedTaskIndex, 
vertex.getUserCodeClassLoader());
 
+                       // The nesting is done by calling this method 
recursively when
+                       // passing the collector to the invokable
                        chainableInvokable.setup(
-                                       createChainedOutputs(chainedTaskIndex + 
1),
+                                       createChainedCollector(chainedTaskIndex 
+ 1),
                                        
configuration.getChainedInSerializer(chainedTaskIndex,
-                                                       
streamVertex.getUserCodeClassLoader()));
+                                                       
vertex.getUserCodeClassLoader()));
 
+                       // We hold a list of the chained invokables for 
initializaton
+                       // afterwards
                        chainedInvokables.add(chainableInvokable);
 
-                       chainedCollector.addChainedOutput((Collector<OUT>) 
chainableInvokable);
-
-                       return chainedCollector;
+                       return chainableInvokable;
                }
 
        }
 
-       private Collector<OUT> createEndCollector() {
+       private Collector<OUT> createNetworkCollector() {
 
-               setSerializers();
+               createOutSerializer();
 
                StreamOutputWrapper<OUT> collector;
 
-               if (streamVertex.configuration.getDirectedEmit()) {
-                       OutputSelector<OUT> outputSelector = 
streamVertex.configuration
-                                       
.getOutputSelector(streamVertex.userClassLoader);
+               if (vertex.configuration.isDirectedEmit()) {
+                       OutputSelector<OUT> outputSelector = 
vertex.configuration
+                                       
.getOutputSelector(vertex.userClassLoader);
 
-                       collector = new 
DirectedOutputWrapper<OUT>(streamVertex.getInstanceID(),
+                       collector = new 
DirectedOutputWrapper<OUT>(vertex.getInstanceID(),
                                        outSerializationDelegate, 
outputSelector);
                } else {
-                       collector = new 
StreamOutputWrapper<OUT>(streamVertex.getInstanceID(),
+                       collector = new 
StreamOutputWrapper<OUT>(vertex.getInstanceID(),
                                        outSerializationDelegate);
                }
 
                int numberOfOutputs = configuration.getNumberOfOutputs();
                for (int i = 0; i < numberOfOutputs; i++) {
-                       collector = (StreamOutputWrapper<OUT>) 
setPartitioner(i, collector);
+                       collector = (StreamOutputWrapper<OUT>) 
addStreamOutput(i, collector);
                }
 
                return collector;
        }
 
        public Collector<OUT> getCollector() {
-               return endCollector;
+               return outerCollector;
        }
 
-       void setSerializers() {
-               outSerializer = 
configuration.getTypeSerializerOut1(streamVertex.userClassLoader);
+       void createOutSerializer() {
+               outSerializer = 
configuration.getTypeSerializerOut1(vertex.userClassLoader);
                if (outSerializer != null) {
                        outSerializationDelegate = new 
SerializationDelegate<StreamRecord<OUT>>(outSerializer);
                        
outSerializationDelegate.setInstance(outSerializer.createInstance());
                }
        }
 
-       Collector<OUT> setPartitioner(int outputNumber, 
StreamOutputWrapper<OUT> endCollector) {
-               StreamPartitioner<OUT> outputPartitioner = null;
+       Collector<OUT> addStreamOutput(int outputNumber, 
StreamOutputWrapper<OUT> networkCollector) {
+
+               StreamPartitioner<OUT> outputPartitioner;
 
                try {
-                       outputPartitioner = 
configuration.getPartitioner(streamVertex.userClassLoader,
+                       outputPartitioner = 
configuration.getPartitioner(vertex.userClassLoader,
                                        outputNumber);
-
                } catch (Exception e) {
                        throw new StreamVertexException("Cannot deserialize 
partitioner for "
-                                       + streamVertex.getName() + " with " + 
outputNumber + " outputs", e);
+                                       + vertex.getName() + " with " + 
outputNumber + " outputs", e);
                }
 
                RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
@@ -162,37 +156,33 @@ public class OutputHandler<OUT> {
                long bufferTimeout = configuration.getBufferTimeout();
 
                if (bufferTimeout >= 0) {
-                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex
+                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
                                        
.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
 
                        if (LOG.isTraceEnabled()) {
                                LOG.trace("StreamRecordWriter initiated with {} 
bufferTimeout for {}",
-                                               bufferTimeout, 
streamVertex.getClass().getSimpleName());
+                                               bufferTimeout, 
vertex.getClass().getSimpleName());
                        }
                } else {
-                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex
+                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex
                                        
.getEnvironment().getWriter(outputNumber), outputPartitioner);
 
                        if (LOG.isTraceEnabled()) {
-                               LOG.trace("RecordWriter initiated for {}", 
streamVertex.getClass().getSimpleName());
+                               LOG.trace("RecordWriter initiated for {}", 
vertex.getClass().getSimpleName());
                        }
                }
 
                outputs.add(output);
-               List<String> outputNames = 
configuration.getOutputNames(outputNumber);
-               boolean isSelectAllOutput = 
configuration.getSelectAll(outputNumber);
 
-               if (endCollector != null) {
-                       endCollector.addOutput(new StreamOutput<OUT>(output, 
isSelectAllOutput ? null
-                                       : outputNames));
-               }
+               networkCollector.addOutput(new StreamOutput<OUT>(output, 
configuration
+                               .isSelectAll(outputNumber) ? null : 
configuration.getOutputNames(outputNumber)));
 
                if (LOG.isTraceEnabled()) {
                        LOG.trace("Partitioner set: {} with {} outputs for {}", 
outputPartitioner.getClass()
-                                       .getSimpleName(), outputNumber, 
streamVertex.getClass().getSimpleName());
+                                       .getSimpleName(), outputNumber, 
vertex.getClass().getSimpleName());
                }
 
-               return endCollector;
+               return networkCollector;
        }
 
        public void flushOutputs() throws IOException, InterruptedException {
@@ -205,17 +195,15 @@ public class OutputHandler<OUT> {
                }
        }
 
-       long startTime;
-
        public void invokeUserFunction(String componentTypeName, 
StreamInvokable<?, OUT> userInvokable)
                        throws IOException, InterruptedException {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("{} {} invoked with instance id {}", 
componentTypeName,
-                                       streamVertex.getName(), 
streamVertex.getInstanceID());
+                                       vertex.getName(), 
vertex.getInstanceID());
                }
 
                try {
-                       streamVertex.invokeUserFunction(userInvokable);
+                       vertex.invokeUserFunction(userInvokable);
                } catch (Exception e) {
                        flushOutputs();
                        throw new RuntimeException(e);
@@ -223,7 +211,7 @@ public class OutputHandler<OUT> {
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("{} {} invoke finished instance id {}", 
componentTypeName,
-                                       streamVertex.getName(), 
streamVertex.getInstanceID());
+                                       vertex.getName(), 
vertex.getInstanceID());
                }
 
                flushOutputs();

Reply via email to