[FLINK-1345] [streaming] Basic operator chaining added

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26535c48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26535c48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26535c48

Branch: refs/heads/master
Commit: 26535c487753975556aafc19c2ce4f9c24cc677b
Parents: d9b942b
Author: Gyula Fora <[email protected]>
Authored: Thu Jan 15 22:32:13 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:33 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 222 +++++++++++++------
 .../flink/streaming/api/StreamConfig.java       |  56 +++--
 .../api/collector/CollectorWrapper.java         |   2 +-
 .../api/datastream/IterativeDataStream.java     |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   5 +
 .../environment/StreamExecutionEnvironment.java |   2 +-
 .../api/invokable/ChainableInvokable.java       |  38 ++++
 .../streaming/api/invokable/SinkInvokable.java  |  10 +-
 .../api/invokable/StreamInvokable.java          |  23 +-
 .../invokable/operator/CounterInvokable.java    |  10 +-
 .../api/invokable/operator/FilterInvokable.java |  18 +-
 .../invokable/operator/FlatMapInvokable.java    |  12 +-
 .../operator/GroupedReduceInvokable.java        |   7 +-
 .../api/invokable/operator/MapInvokable.java    |  12 +-
 .../operator/StreamReduceInvokable.java         |  15 +-
 .../operator/co/CoWindowInvokable.java          |   2 +-
 .../api/streamvertex/OutputHandler.java         |  86 +++++--
 .../api/streamvertex/StreamVertex.java          |  24 +-
 .../examples/iteration/IterateExample.java      |   3 +-
 19 files changed, 406 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 649072f..35e145e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -32,8 +34,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -56,6 +57,8 @@ public class JobGraphBuilder {
        private final static String DEAFULT_JOB_NAME = "Streaming Job";
        private JobGraph jobGraph;
 
+       private boolean chaining = true;
+
        // Graph attributes
        private Map<String, AbstractJobVertex> streamVertices;
        private Map<String, Integer> vertexParallelism;
@@ -65,7 +68,7 @@ public class JobGraphBuilder {
        private Map<String, List<List<String>>> outEdgeNames;
        private Map<String, List<Boolean>> outEdgeSelectAll;
        private Map<String, List<String>> inEdgeList;
-       private Map<String, List<StreamPartitioner<?>>> connectionTypes;
+       private Map<String, List<StreamPartitioner<?>>> outPartitioning;
        private Map<String, String> operatorNames;
        private Map<String, StreamInvokable<?, ?>> invokableObjects;
        private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
@@ -81,6 +84,12 @@ public class JobGraphBuilder {
        private Map<String, Long> iterationWaitTime;
        private Map<String, Map<String, OperatorState<?>>> operatorStates;
        private Map<String, InputFormat<String, ?>> inputFormatList;
+       private Map<String, List<String>> chainedVertices;
+       private Map<String, List<ChainableInvokable<?, ?>>> chainedInvokable;
+       private Map<String, List<StreamRecordSerializer<?>>> chainedSerializer;
+
+       private Set<String> sources;
+       private Set<String> builtVertices;
 
        /**
         * Creates an new {@link JobGraph} with the given name. A JobGraph is a 
DAG
@@ -96,7 +105,7 @@ public class JobGraphBuilder {
                outEdgeNames = new HashMap<String, List<List<String>>>();
                outEdgeSelectAll = new HashMap<String, List<Boolean>>();
                inEdgeList = new HashMap<String, List<String>>();
-               connectionTypes = new HashMap<String, 
List<StreamPartitioner<?>>>();
+               outPartitioning = new HashMap<String, 
List<StreamPartitioner<?>>>();
                operatorNames = new HashMap<String, String>();
                invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
                typeSerializersIn1 = new HashMap<String, 
StreamRecordSerializer<?>>();
@@ -112,6 +121,12 @@ public class JobGraphBuilder {
                iterationWaitTime = new HashMap<String, Long>();
                operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
                inputFormatList = new HashMap<String, InputFormat<String, ?>>();
+               chainedInvokable = new HashMap<String, 
List<ChainableInvokable<?, ?>>>();
+               chainedSerializer = new HashMap<String, 
List<StreamRecordSerializer<?>>>();
+               chainedVertices = new HashMap<String, List<String>>();
+
+               sources = new HashSet<String>();
+               builtVertices = new HashSet<String>();
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("JobGraph created");
@@ -152,32 +167,12 @@ public class JobGraphBuilder {
                }
        }
 
-       /**
-        * Adds a source vertex to the streaming JobGraph with the given 
parameters
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param function
-        *            User defined function
-        * @param inTypeInfo
-        *            Input type for serialization
-        * @param outTypeInfo
-        *            Output type for serialization
-        * @param operatorName
-        *            Operator type
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       public <IN, OUT> void addSourceVertex(String vertexName, 
SourceFunction<OUT> function,
-                       TypeInformation<IN> inTypeInfo, TypeInformation<OUT> 
outTypeInfo, String operatorName,
-                       byte[] serializedFunction, int parallelism) {
-
-               @SuppressWarnings("unchecked")
-               StreamInvokable<IN, OUT> invokableObject = (StreamInvokable<IN, 
OUT>) new SourceInvokable<OUT>(
-                               function);
-
+       public <IN, OUT> void addSourceVertex(String vertexName,
+                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
+                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
                addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
                                parallelism);
+               sources.add(vertexName);
        }
 
        /**
@@ -205,7 +200,7 @@ public class JobGraphBuilder {
 
                setBytesFrom(iterationHead, vertexName);
 
-               setEdge(vertexName, iterationHead, connectionTypes
+               setEdge(vertexName, iterationHead, outPartitioning
                                
.get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(),
                                false);
 
@@ -214,6 +209,8 @@ public class JobGraphBuilder {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("ITERATION SOURCE: {}", vertexName);
                }
+
+               sources.add(vertexName);
        }
 
        /**
@@ -295,7 +292,7 @@ public class JobGraphBuilder {
                outEdgeNames.put(vertexName, new ArrayList<List<String>>());
                outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>());
                inEdgeList.put(vertexName, new ArrayList<String>());
-               connectionTypes.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
+               outPartitioning.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
                iterationTailCount.put(vertexName, 0);
        }
 
@@ -317,50 +314,134 @@ public class JobGraphBuilder {
         */
        private void createVertex(String vertexName) {
 
-               // Get vertex attributes
-               Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
-               StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
-               int parallelism = vertexParallelism.get(vertexName);
-               byte[] outputSelector = outputSelectors.get(vertexName);
-               Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
+               if (!builtVertices.contains(vertexName)) {
+                       if (!outEdgeList.get(vertexName).isEmpty()) {
 
-               // Create vertex object
-               AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
+                               for (String outName : 
outEdgeList.get(vertexName)) {
+                                       if (isChainable(vertexName, outName)) {
+                                               chainRecursively(vertexName, 
vertexName, outName);
+                                       } else {
+                                               createVertex(outName);
+                                       }
 
-               this.jobGraph.addVertex(vertex);
+                               }
+                       }
+
+                       // Get vertex attributes
+                       Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
+                       StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
+                       int parallelism = vertexParallelism.get(vertexName);
+                       byte[] outputSelector = outputSelectors.get(vertexName);
+                       Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
+
+                       // Create vertex object
+                       String cname = chainedVertices.get(vertexName) == null 
? "" : " => "
+                                       + 
StringUtils.join(chainedVertices.get(vertexName), " => ");
+                       AbstractJobVertex vertex = new 
AbstractJobVertex(vertexName + cname);
+
+                       this.jobGraph.addVertex(vertex);
+
+                       vertex.setInvokableClass(vertexClass);
+                       vertex.setParallelism(parallelism);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Parallelism set: {} for {}", 
parallelism, vertexName);
+                       }
+
+                       StreamConfig config = new 
StreamConfig(vertex.getConfiguration());
+
+                       config.setBufferTimeout(bufferTimeout.get(vertexName));
+
+                       
config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+                       
config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+                       
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+                       
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+
+                       // Set vertex config
+                       config.setUserInvokable(invokableObject);
+                       config.setOutputSelector(outputSelector);
+                       config.setOperatorStates(state);
+
+                       if (vertexClass.equals(StreamIterationHead.class)
+                                       || 
vertexClass.equals(StreamIterationTail.class)) {
+                               
config.setIterationId(iterationIds.get(vertexName));
+                               
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
+                       }
+
+                       if (inputFormatList.containsKey(vertexName)) {
+                               
vertex.setInputSplitSource(inputFormatList.get(vertexName));
+                       }
+
+                       List<ChainableInvokable<?, ?>> chainedInvokables = 
chainedInvokable.get(vertexName);
+                       List<StreamRecordSerializer<?>> chainedSerializers = 
chainedSerializer.get(vertexName);
+
+                       int numChained = chainedInvokables == null ? 0 : 
chainedInvokables.size();
+                       config.setNumberofChainedTasks(numChained);
+
+                       for (int i = 0; i < numChained; i++) {
+                               
config.setChainedInvokable(chainedInvokables.get(i), i);
+                               
config.setChainedSerializer(chainedSerializers.get(i), i);
+                       }
+
+                       streamVertices.put(vertexName, vertex);
+                       builtVertices.add(vertexName);
 
-               vertex.setInvokableClass(vertexClass);
-               vertex.setParallelism(parallelism);
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Parallelism set: {} for {}", parallelism, 
vertexName);
                }
 
-               StreamConfig config = new 
StreamConfig(vertex.getConfiguration());
+       }
 
-               config.setBufferTimeout(bufferTimeout.get(vertexName));
+       private void chainRecursively(String chainStart, String current, String 
next) {
+               chainTasks(chainStart, next);
+               // Add multiple chaining here
+               for (String output : outEdgeList.get(next)) {
+                       if (isChainable(next, output)) {
+                               chainRecursively(chainStart, next, output);
+                       } else {
+                               createVertex(output);
+                       }
+               }
+       }
 
-               config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
-               config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
-               
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
-               
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
+       private boolean isChainable(String vertexName, String outName) {
+               return outEdgeList.get(vertexName).size() == 1
+                               && inEdgeList.get(outName).size() == 1
+                               && outputSelectors.get(vertexName) == null
+                               && invokableObjects.get(outName).isChainable()
+                               && 
outPartitioning.get(vertexName).get(0).getStrategy() == 
PartitioningStrategy.FORWARD
+                               && vertexParallelism.get(vertexName) == 
vertexParallelism.get(outName) && chaining;
+       }
 
-               // Set vertex config
-               config.setUserInvokable(invokableObject);
-               config.setVertexName(vertexName);
-               config.setOutputSelector(outputSelector);
-               config.setOperatorStates(state);
+       private void chainTasks(String first, String second) {
+               List<ChainableInvokable<?, ?>> chainedInvokables = 
chainedInvokable.get(first);
+               if (chainedInvokables == null) {
+                       chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
+               }
+               chainedInvokables.add((ChainableInvokable<?, ?>) 
invokableObjects.get(second));
+               chainedInvokable.put(first, chainedInvokables);
 
-               if (vertexClass.equals(StreamIterationHead.class)
-                               || 
vertexClass.equals(StreamIterationTail.class)) {
-                       config.setIterationId(iterationIds.get(vertexName));
-                       
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
+               List<StreamRecordSerializer<?>> chainedSerializers = 
chainedSerializer.get(first);
+               if (chainedSerializers == null) {
+                       chainedSerializers = new 
ArrayList<StreamRecordSerializer<?>>();
                }
+               chainedSerializers.add(typeSerializersIn1.get(second));
+               chainedSerializer.put(first, chainedSerializers);
 
-               if (inputFormatList.containsKey(vertexName)) {
-                       
vertex.setInputSplitSource(inputFormatList.get(vertexName));
+               List<String> chained = chainedVertices.get(first);
+               if (chained == null) {
+                       chained = new ArrayList<String>();
                }
+               chained.add(second);
+               chainedVertices.put(first, chained);
+
+               outEdgeList.put(first, outEdgeList.get(second));
+               typeSerializersOut1.put(first, typeSerializersOut1.get(second));
+               outPartitioning.put(first, outPartitioning.get(second));
+               outEdgeType.put(first, outEdgeType.get(second));
+               outEdgeNames.put(first, outEdgeNames.get(second));
+               outEdgeSelectAll.put(first, outEdgeSelectAll.get(second));
+               outPartitioning.put(first, outPartitioning.get(second));
+               bufferTimeout.put(first, bufferTimeout.get(second));
+               outputSelectors.put(first, outputSelectors.get(second));
 
-               streamVertices.put(vertexName, vertex);
        }
 
        /**
@@ -385,8 +466,8 @@ public class JobGraphBuilder {
                        downStreamVertex
                                        
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
                } else {
-                       downStreamVertex
-                                       
.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.ALL_TO_ALL);
+                       
downStreamVertex.connectNewDataSetAsInput(upStreamVertex,
+                                       DistributionPattern.ALL_TO_ALL);
                }
 
                if (LOG.isDebugEnabled()) {
@@ -468,7 +549,7 @@ public class JobGraphBuilder {
                outEdgeList.get(upStreamVertexName).add(downStreamVertexName);
                outEdgeType.get(upStreamVertexName).add(typeNumber);
                inEdgeList.get(downStreamVertexName).add(upStreamVertexName);
-               connectionTypes.get(upStreamVertexName).add(partitionerObject);
+               outPartitioning.get(upStreamVertexName).add(partitionerObject);
                outEdgeNames.get(upStreamVertexName).add(outputNames);
                outEdgeSelectAll.get(upStreamVertexName).add(selectAll);
        }
@@ -602,11 +683,12 @@ public class JobGraphBuilder {
         * provided.
         */
        private void buildJobGraph() {
-               for (String vertexName : outEdgeList.keySet()) {
-                       createVertex(vertexName);
+
+               for (String sourceName : sources) {
+                       createVertex(sourceName);
                }
 
-               for (String upStreamVertexName : outEdgeList.keySet()) {
+               for (String upStreamVertexName : builtVertices) {
                        int i = 0;
 
                        List<Integer> outEdgeTypeList = 
outEdgeType.get(upStreamVertexName);
@@ -621,7 +703,7 @@ public class JobGraphBuilder {
                                
downStreamVertexConfig.setNumberOfInputs(inputNumber);
 
                                connect(upStreamVertexName, 
downStreamVertexName,
-                                               
connectionTypes.get(upStreamVertexName).get(i));
+                                               
outPartitioning.get(upStreamVertexName).get(i));
                                i++;
                        }
                }
@@ -631,4 +713,8 @@ public class JobGraphBuilder {
                setNumberOfJobOutputs();
        }
 
+       public void setChaining(boolean chaining) {
+               this.chaining = chaining;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 8837b85..22bcf92 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
@@ -39,6 +40,9 @@ public class StreamConfig {
        private static final String INPUT_TYPE = "inputType_";
        private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
        private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+       private static final String NUMBER_OF_CHAINED_TASKS = "numOfChained";
+       private static final String CHAINED_IN_SERIALIZER = 
"chainedSerializer_";
+       private static final String CHAINED_INVOKABLE = "chainedInvokable_";
        private static final String OUTPUT_NAME = "outputName_";
        private static final String OUTPUT_SELECT_ALL = "outputSelectAll_";
        private static final String PARTITIONER_OBJECT = "partitionerObject_";
@@ -46,8 +50,6 @@ public class StreamConfig {
        private static final String ITERATION_ID = "iteration-id";
        private static final String OUTPUT_SELECTOR = "outputSelector";
        private static final String DIRECTED_EMIT = "directedEmit";
-       private static final String FUNCTION_NAME = "operatorName";
-       private static final String VERTEX_NAME = "vertexName";
        private static final String SERIALIZEDUDF = "serializedudf";
        private static final String USER_FUNCTION = "userfunction";
        private static final String BUFFER_TIMEOUT = "bufferTimeout";
@@ -164,18 +166,6 @@ public class StreamConfig {
                }
        }
 
-       public void setVertexName(String vertexName) {
-               config.setString(VERTEX_NAME, vertexName);
-       }
-
-       public String getVertexName() {
-               return config.getString(VERTEX_NAME, null);
-       }
-
-       public String getFunctionName() {
-               return config.getString(FUNCTION_NAME, "");
-       }
-
        public void setDirectedEmit(boolean directedEmit) {
                config.setBoolean(DIRECTED_EMIT, directedEmit);
        }
@@ -261,7 +251,7 @@ public class StreamConfig {
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getOutputName(int outputIndex) {
+       public List<String> getOutputNames(int outputIndex) {
                return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
                                + outputIndex, null));
        }
@@ -316,4 +306,40 @@ public class StreamConfig {
                }
        }
 
+       public int getNumberofChainedTasks() {
+               return config.getInteger(NUMBER_OF_CHAINED_TASKS, 0);
+       }
+
+       public void setNumberofChainedTasks(int n) {
+               config.setInteger(NUMBER_OF_CHAINED_TASKS, n);
+       }
+
+       public ChainableInvokable<?, ?> getChainedInvokable(int 
chainedTaskIndex, ClassLoader cl) {
+               try {
+                       return (ChainableInvokable<?, ?>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                                       CHAINED_INVOKABLE + chainedTaskIndex, 
cl);
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not instantiate 
invokable.");
+               }
+       }
+
+       public StreamRecordSerializer<?> getChainedInSerializer(int 
chainedTaskIndex, ClassLoader cl) {
+               try {
+                       return (StreamRecordSerializer<?>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                                       CHAINED_IN_SERIALIZER + 
chainedTaskIndex, cl);
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not instantiate 
serializer.");
+               }
+       }
+
+       public void setChainedSerializer(StreamRecordSerializer<?> typeWrapper, 
int chainedTaskIndex) {
+               config.setBytes(CHAINED_IN_SERIALIZER + chainedTaskIndex,
+                               SerializationUtils.serialize(typeWrapper));
+       }
+
+       public void setChainedInvokable(ChainableInvokable<?, ?> invokable, int 
chainedTaskIndex) {
+               config.setBytes(CHAINED_INVOKABLE + chainedTaskIndex,
+                               SerializationUtils.serialize(invokable));
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
index d782d08..b7e57e0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -37,7 +37,7 @@ public class CollectorWrapper<OUT> implements Collector<OUT> {
        @Override
        public void collect(OUT record) {
                for(Collector<OUT> output: outputs){
-                       output.collect(record);;
+                       output.collect(record);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index d7467d1..88feb5d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -67,7 +67,7 @@ public class IterativeDataStream<IN> extends
                                iterationTail.getParallelism(), waitTime);
 
                
jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), 
iterationTail.getId());
-               connectGraph(iterationTail, iterationSink.getId(), 0);
+               connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
                return iterationTail;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 5a8261e..7a0abe4 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -182,6 +182,11 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                return (SingleOutputStreamOperator<OUT, O>) super.distribute();
        }
 
+       @SuppressWarnings("unchecked")
+       public SingleOutputStreamOperator<OUT, O> global() {
+               return (SingleOutputStreamOperator<OUT, O>) super.global();
+       }
+
        @Override
        public SingleOutputStreamOperator<OUT, O> copy() {
                return new SingleOutputStreamOperator<OUT, O>(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 87f2287..08fd19b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -434,7 +434,7 @@ public abstract class StreamExecutionEnvironment {
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, sourceName,
                                outTypeInfo, isParallel);
 
-               jobGraphBuilder.addStreamVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
+               jobGraphBuilder.addSourceVertex(returnStream.getId(), new 
SourceInvokable<OUT>(function),
                                null, outTypeInfo, sourceName, dop);
 
                return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
new file mode 100644
index 0000000..373b4e8
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+
+public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, 
OUT> implements
+               Collector<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       public ChainableInvokable(Function userFunction) {
+               super(userFunction);
+       }
+
+       public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> 
inSerializer) {
+               this.collector = collector;
+               this.inSerializer = inSerializer;
+               this.objectSerializer = inSerializer.getObjectSerializer();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 74591a8..13a6ba1 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
+public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
        private static final long serialVersionUID = 1L;
 
        private SinkFunction<IN> sinkFunction;
@@ -38,7 +38,13 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, 
IN> {
 
        @Override
        protected void callUserFunction() throws Exception {
-               sinkFunction.invoke((IN) nextRecord.getObject());
+               sinkFunction.invoke(nextObject);
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 87ad4e0..25e9221 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The StreamInvokable represents the base class for all invokables in the
  * streaming topology.
- *
+ * 
  * @param <OUT>
  *            The output type of the invokable
  */
@@ -52,6 +52,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        protected StreamRecordSerializer<IN> inSerializer;
        protected TypeSerializer<IN> objectSerializer;
        protected StreamRecord<IN> nextRecord;
+       protected IN nextObject;
        protected boolean isMutable;
 
        protected Collector<OUT> collector;
@@ -92,7 +93,13 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        protected StreamRecord<IN> readNext() {
                this.nextRecord = inSerializer.createInstance();
                try {
-                       return nextRecord = recordIterator.next(nextRecord);
+                       nextRecord = recordIterator.next(nextRecord);
+                       try {
+                               nextObject = nextRecord.getObject();
+                       } catch (NullPointerException e) {
+                               // end of stream
+                       }
+                       return nextRecord;
                } catch (IOException e) {
                        throw new RuntimeException("Could not read next 
record.");
                }
@@ -135,10 +142,14 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
         * RichFunction class
         * 
         */
-       public void close() throws Exception {
+       public void close() {
                isRunning = false;
                collector.close();
-               FunctionUtils.closeFunction(userFunction);
+               try {
+                       FunctionUtils.closeFunction(userFunction);
+               } catch (Exception e) {
+                       throw new RuntimeException("Error when closing the 
function: " + e.getMessage());
+               }
        }
 
        public void setRuntimeContext(RuntimeContext t) {
@@ -148,4 +159,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        protected IN copy(IN record) {
                return objectSerializer.copy(record);
        }
+
+       public boolean isChainable() {
+               return this instanceof ChainableInvokable;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 0267253..3fc314c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
+public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
        private static final long serialVersionUID = 1L;
 
        Long count = 0L;
@@ -34,4 +34,10 @@ public class CounterInvokable<IN> extends 
StreamInvokable<IN, Long> {
                        collector.collect(++count);
                }
        }
+
+       @Override
+       public void collect(IN record) {
+               collector.collect(++count);
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 48b8ad0..0c8298e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
+public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
        private static final long serialVersionUID = 1L;
 
@@ -36,14 +36,20 @@ public class FilterInvokable<IN> extends 
StreamInvokable<IN, IN> {
        public void invoke() throws Exception {
                while (readNext() != null) {
                        callUserFunctionAndLogException();
-                       if (collect) {
-                               collector.collect(nextRecord.getObject());
-                       }
                }
        }
 
        @Override
        protected void callUserFunction() throws Exception {
-               collect = filterFunction.filter(copy(nextRecord.getObject()));
+               collect = filterFunction.filter(copy(nextObject));
+               if (collect) {
+                       collector.collect(nextObject);
+               }
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 8ff78eb..2a4081b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
        private static final long serialVersionUID = 1L;
 
        private FlatMapFunction<IN, OUT> flatMapper;
@@ -39,7 +39,13 @@ public class FlatMapInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
 
        @Override
        protected void callUserFunction() throws Exception {
-               flatMapper.flatMap(nextRecord.getObject(), collector);
+               flatMapper.flatMap(nextObject, collector);
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index fdcf520..84258d6 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -40,7 +40,7 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
        protected void reduce() throws Exception {
                Object key = nextRecord.getKey(keySelector);
                currentValue = values.get(key);
-               nextValue = nextRecord.getObject();
+               nextValue = nextObject;
                if (currentValue != null) {
                        callUserFunctionAndLogException();
                        values.put(key, reduced);
@@ -56,4 +56,9 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
                reduced = reducer.reduce(currentValue, nextValue);
        }
 
+       @Override
+       public boolean isChainable() {
+               return false;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 6be96ec..7c8e577 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
        private static final long serialVersionUID = 1L;
 
        private MapFunction<IN, OUT> mapper;
@@ -39,6 +39,12 @@ public class MapInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
 
        @Override
        protected void callUserFunction() throws Exception {
-               collector.collect(mapper.map(nextRecord.getObject()));
+               collector.collect(mapper.map(nextObject));
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 5f5cb12..e1a56cc 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
+public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
        private static final long serialVersionUID = 1L;
 
        protected ReduceFunction<IN> reducer;
@@ -41,13 +41,15 @@ public class StreamReduceInvokable<IN> extends 
StreamInvokable<IN, IN> {
        }
 
        protected void reduce() throws Exception {
-               nextValue = nextRecord.getObject();
                callUserFunctionAndLogException();
 
        }
 
        @Override
        protected void callUserFunction() throws Exception {
+
+               nextValue = nextObject;
+
                if (currentValue != null) {
                        currentValue = reducer.reduce(currentValue, nextValue);
                } else {
@@ -57,4 +59,11 @@ public class StreamReduceInvokable<IN> extends 
StreamInvokable<IN, IN> {
                collector.collect(currentValue);
 
        }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index 03219b7..93f597f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -175,7 +175,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2, OUT>
        }
 
        @Override
-       public void close() throws Exception {
+       public void close() {
                if (!window.miniBatchEnd()) {
                        callUserFunctionAndLogException();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 61a6eb4..457f3f8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -17,26 +17,30 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.collector.StreamOutputWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.api.collector.StreamOutputWrapper;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
 public class OutputHandler<OUT> {
        private static final Logger LOG = 
LoggerFactory.getLogger(OutputHandler.class);
 
@@ -44,17 +48,21 @@ public class OutputHandler<OUT> {
        private StreamConfig configuration;
 
        private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
outputs;
-       private StreamOutputWrapper<OUT> collector;
-       private long bufferTimeout;
+       private Collector<OUT> endCollector;
 
        TypeInformation<OUT> outTypeInfo = null;
        StreamRecordSerializer<OUT> outSerializer = null;
        SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = 
null;
 
+       public List<ChainableInvokable<?, ?>> chainedInvokables;
+
+       private int numberOfChainedTasks;
+
        public OutputHandler(StreamVertex<?, OUT> streamComponent) {
                this.streamVertex = streamComponent;
                this.outputs = new 
LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
                this.configuration = new 
StreamConfig(streamComponent.getTaskConfiguration());
+               this.chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
 
                try {
                        setConfigOutputs();
@@ -62,6 +70,7 @@ public class OutputHandler<OUT> {
                        throw new StreamVertexException("Cannot register 
outputs for "
                                        + 
streamComponent.getClass().getSimpleName(), e);
                }
+
        }
 
        public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
getOutputs() {
@@ -69,18 +78,42 @@ public class OutputHandler<OUT> {
        }
 
        private void setConfigOutputs() {
-               setSerializers();
-               setCollector();
+               numberOfChainedTasks = configuration.getNumberofChainedTasks();
+               endCollector = createChainedOutputs(0);
+       }
 
-               int numberOfOutputs = configuration.getNumberOfOutputs();
-               bufferTimeout = configuration.getBufferTimeout();
+       @SuppressWarnings("unchecked")
+       private Collector<OUT> createChainedOutputs(int chainedTaskIndex) {
 
-               for (int i = 0; i < numberOfOutputs; i++) {
-                       setPartitioner(i, outputs);
+               if (numberOfChainedTasks == chainedTaskIndex) {
+                       return createEndCollector();
+               } else {
+                       CollectorWrapper<OUT> chainedCollector = new 
CollectorWrapper<OUT>();
+
+                       @SuppressWarnings("rawtypes")
+                       ChainableInvokable chainableInvokable = 
configuration.getChainedInvokable(
+                                       chainedTaskIndex, 
streamVertex.getUserCodeClassLoader());
+
+                       chainableInvokable.setup(
+                                       createChainedOutputs(chainedTaskIndex + 
1),
+                                       
configuration.getChainedInSerializer(chainedTaskIndex,
+                                                       
streamVertex.getUserCodeClassLoader()));
+
+                       chainedInvokables.add(chainableInvokable);
+
+                       chainedCollector.addChainedOutput((Collector<OUT>) 
chainableInvokable);
+
+                       return chainedCollector;
                }
+
        }
 
-       private StreamOutputWrapper<OUT> setCollector() {
+       private Collector<OUT> createEndCollector() {
+
+               setSerializers();
+
+               StreamOutputWrapper<OUT> collector;
+
                if (streamVertex.configuration.getDirectedEmit()) {
                        OutputSelector<OUT> outputSelector = 
streamVertex.configuration
                                        
.getOutputSelector(streamVertex.userClassLoader);
@@ -91,11 +124,17 @@ public class OutputHandler<OUT> {
                        collector = new 
StreamOutputWrapper<OUT>(streamVertex.getInstanceID(),
                                        outSerializationDelegate);
                }
+
+               int numberOfOutputs = configuration.getNumberOfOutputs();
+               for (int i = 0; i < numberOfOutputs; i++) {
+                       collector = (StreamOutputWrapper<OUT>) 
setPartitioner(i, collector);
+               }
+
                return collector;
        }
 
-       public StreamOutputWrapper<OUT> getCollector() {
-               return collector;
+       public Collector<OUT> getCollector() {
+               return endCollector;
        }
 
        void setSerializers() {
@@ -106,8 +145,7 @@ public class OutputHandler<OUT> {
                }
        }
 
-       void setPartitioner(int outputNumber,
-                       
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
+       Collector<OUT> setPartitioner(int outputNumber, 
StreamOutputWrapper<OUT> endCollector) {
                StreamPartitioner<OUT> outputPartitioner = null;
 
                try {
@@ -121,6 +159,8 @@ public class OutputHandler<OUT> {
 
                RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
+               long bufferTimeout = configuration.getBufferTimeout();
+
                if (bufferTimeout >= 0) {
                        output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex
                                        
.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
@@ -139,18 +179,20 @@ public class OutputHandler<OUT> {
                }
 
                outputs.add(output);
-               List<String> outputName = 
configuration.getOutputName(outputNumber);
+               List<String> outputNames = 
configuration.getOutputNames(outputNumber);
                boolean isSelectAllOutput = 
configuration.getSelectAll(outputNumber);
 
-               if (collector != null) {
-                       collector
-                                       .addOutput(new 
StreamOutput<OUT>(output, isSelectAllOutput ? null : outputName));
+               if (endCollector != null) {
+                       endCollector.addOutput(new StreamOutput<OUT>(output, 
isSelectAllOutput ? null
+                                       : outputNames));
                }
 
                if (LOG.isTraceEnabled()) {
                        LOG.trace("Partitioner set: {} with {} outputs for {}", 
outputPartitioner.getClass()
                                        .getSimpleName(), outputNumber, 
streamVertex.getClass().getSimpleName());
                }
+
+               return endCollector;
        }
 
        public void flushOutputs() throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 93e290f..994b1fa 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -35,11 +36,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
 
        protected StreamConfig configuration;
        protected int instanceID;
-       protected String name;
        private static int numVertices = 0;
 
-       protected String functionName;
-
        private InputHandler<IN> inputHandler;
        protected OutputHandler<OUT> outputHandler;
        private StreamInvokable<IN, OUT> userInvokable;
@@ -70,17 +68,26 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        protected void initialize() {
                this.userClassLoader = getUserCodeClassLoader();
                this.configuration = new StreamConfig(getTaskConfiguration());
-               this.name = configuration.getVertexName();
-               this.functionName = configuration.getFunctionName();
                this.states = configuration.getOperatorStates(userClassLoader);
-               this.context = createRuntimeContext(name, this.states);
+               this.context = 
createRuntimeContext(getEnvironment().getTaskName(), this.states);
        }
 
        protected <T> void invokeUserFunction(StreamInvokable<?, T> 
userInvokable) throws Exception {
                userInvokable.setRuntimeContext(context);
                userInvokable.open(getTaskConfiguration());
+
+               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
+                       invokable.setRuntimeContext(context);
+                       invokable.open(getTaskConfiguration());
+               }
+
                userInvokable.invoke();
                userInvokable.close();
+
+               for (ChainableInvokable<?, ?> invokable : 
outputHandler.chainedInvokables) {
+                       invokable.close();
+               }
+
        }
 
        public void setInputsOutputs() {
@@ -94,14 +101,15 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        }
 
        public String getName() {
-               return name;
+               return getEnvironment().getTaskName();
        }
 
        public int getInstanceID() {
                return instanceID;
        }
 
-       public StreamingRuntimeContext createRuntimeContext(String taskName, 
Map<String, OperatorState<?>> states) {
+       public StreamingRuntimeContext createRuntimeContext(String taskName,
+                       Map<String, OperatorState<?>> states) {
                Environment env = getEnvironment();
                return new StreamingRuntimeContext(taskName, env, 
getUserCodeClassLoader(), states);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index a5dc68a..998e818 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -76,8 +76,7 @@ public class IterateExample {
 
                // apply the step function to add new random value to the tuple 
and to
                // increment the counter and split the output with the output 
selector
-               SplitDataStream<Tuple2<Double, Integer>> step = it.map(new 
Step()).shuffle()
-                               .split(new MySelector());
+               SplitDataStream<Tuple2<Double, Integer>> step = it.map(new 
Step()).split(new MySelector());
 
                // close the iteration by selecting the tuples that were 
directed to the
                // 'iterate' channel in the output selector

Reply via email to