[streaming] JobGraphbuilder separated to StreamGraph and 
StreamingJobGraphGenerator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b608ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b608ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b608ce

Branch: refs/heads/master
Commit: e3b608ce20f2c06b6c9ce8f231b6f53ab73f1de5
Parents: 7dbb55e
Author: Gyula Fora <[email protected]>
Authored: Wed Jan 21 00:34:11 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:34 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 675 -------------------
 .../flink/streaming/api/StreamConfig.java       |  28 +-
 .../apache/flink/streaming/api/StreamGraph.java | 545 +++++++++++++++
 .../api/StreamingJobGraphGenerator.java         | 286 ++++++++
 .../api/datastream/ConnectedDataStream.java     |   8 +-
 .../streaming/api/datastream/DataStream.java    |  18 +-
 .../api/datastream/IterativeDataStream.java     |   4 +-
 .../datastream/SingleOutputStreamOperator.java  |  20 +-
 .../temporaloperator/StreamCrossOperator.java   |   4 +-
 .../temporaloperator/StreamJoinOperator.java    |   2 +-
 .../api/environment/LocalStreamEnvironment.java |   4 +-
 .../environment/RemoteStreamEnvironment.java    |   4 +-
 .../environment/StreamContextEnvironment.java   |   4 +-
 .../environment/StreamExecutionEnvironment.java |  18 +-
 .../operator/GroupedReduceInvokable.java        |   1 -
 .../operator/GroupedWindowInvokable.java        |   3 +-
 .../api/invokable/operator/WindowInvokable.java |   3 +-
 .../api/streamvertex/OutputHandler.java         |  64 +-
 .../streaming/util/TestStreamEnvironment.java   |   2 +-
 .../api/scala/StreamCrossOperator.scala         |   4 +-
 .../api/scala/StreamJoinOperator.scala          |   2 +-
 21 files changed, 914 insertions(+), 785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
deleted file mode 100644
index 6ae97c9..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ /dev/null
@@ -1,675 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.api.streamvertex.StreamVertex;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import 
org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
-import org.apache.flink.streaming.state.OperatorState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Object for building Apache Flink stream processing job graphs
- */
-public class JobGraphBuilder {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(JobGraphBuilder.class);
-       private final static String DEAFULT_JOB_NAME = "Streaming Job";
-       private JobGraph jobGraph;
-
-       private boolean chaining = true;
-
-       // Graph attributes
-       private Map<String, AbstractJobVertex> streamVertices;
-       private Map<String, Integer> vertexParallelism;
-       private Map<String, Long> bufferTimeout;
-       private Map<String, List<String>> outEdgeList;
-       private Map<String, List<Integer>> outEdgeIndex;
-       private Map<String, List<List<String>>> outEdgeNames;
-       private Map<String, List<Boolean>> outEdgeSelectAll;
-       private Map<String, List<String>> inEdgeList;
-       private Map<String, List<StreamPartitioner<?>>> outPartitioning;
-       private Map<String, String> operatorNames;
-       private Map<String, StreamInvokable<?, ?>> invokableObjects;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-       private Map<String, byte[]> outputSelectors;
-       private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
-       private Map<String, Integer> iterationIds;
-       private Map<Integer, String> iterationIDtoHeadName;
-       private Map<Integer, String> iterationIDtoTailName;
-       private Map<String, Integer> iterationTailCount;
-       private Map<String, Long> iterationWaitTime;
-       private Map<String, Map<String, OperatorState<?>>> operatorStates;
-       private Map<String, InputFormat<String, ?>> inputFormatList;
-       private Map<String, Map<String, StreamConfig>> chainedConfigs;
-       private Map<String, StreamConfig> vertexConfigs;
-
-       private Set<String> sources;
-       private Set<String> builtNodes;
-
-       /**
-        * Creates an new {@link JobGraph} with the given name. A JobGraph is a 
DAG
-        * and consists of sources, tasks (intermediate vertices) and sinks.
-        */
-       public JobGraphBuilder() {
-
-               initGraph();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("JobGraph created");
-               }
-       }
-
-       public void initGraph() {
-               streamVertices = new HashMap<String, AbstractJobVertex>();
-               vertexParallelism = new HashMap<String, Integer>();
-               bufferTimeout = new HashMap<String, Long>();
-               outEdgeList = new HashMap<String, List<String>>();
-               outEdgeIndex = new HashMap<String, List<Integer>>();
-               outEdgeNames = new HashMap<String, List<List<String>>>();
-               outEdgeSelectAll = new HashMap<String, List<Boolean>>();
-               inEdgeList = new HashMap<String, List<String>>();
-               outPartitioning = new HashMap<String, 
List<StreamPartitioner<?>>>();
-               operatorNames = new HashMap<String, String>();
-               invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
-               typeSerializersIn1 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               outputSelectors = new HashMap<String, byte[]>();
-               vertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
-               iterationIds = new HashMap<String, Integer>();
-               iterationIDtoHeadName = new HashMap<Integer, String>();
-               iterationIDtoTailName = new HashMap<Integer, String>();
-               iterationTailCount = new HashMap<String, Integer>();
-               iterationWaitTime = new HashMap<String, Long>();
-               operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
-               inputFormatList = new HashMap<String, InputFormat<String, ?>>();
-               chainedConfigs = new HashMap<String, Map<String, 
StreamConfig>>();
-               vertexConfigs = new HashMap<String, StreamConfig>();
-
-               sources = new HashSet<String>();
-               builtNodes = new HashSet<String>();
-       }
-
-       /**
-        * Adds a vertex to the streaming JobGraph with the given parameters
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param invokableObject
-        *            User defined operator
-        * @param inTypeInfo
-        *            Input type for serialization
-        * @param outTypeInfo
-        *            Output type for serialization
-        * @param operatorName
-        *            Operator type
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       public <IN, OUT> void addStreamVertex(String vertexName,
-                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
-                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
-
-               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
-
-               StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
-                               inTypeInfo) : null;
-               StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null 
? new StreamRecordSerializer<OUT>(
-                               outTypeInfo) : null;
-
-               addTypeSerializers(vertexName, inSerializer, null, 
outSerializer, null);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Vertex: {}", vertexName);
-               }
-       }
-
-       public <IN, OUT> void addSourceVertex(String vertexName,
-                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
-                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
-               addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
-                               parallelism);
-               sources.add(vertexName);
-       }
-
-       /**
-        * Adds a vertex for the iteration head to the {@link JobGraph}. The
-        * iterated values will be fed from this vertex back to the graph.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param iterationHead
-        *            Id of the iteration head
-        * @param iterationID
-        *            ID of iteration for multiple iterations
-        * @param parallelism
-        *            Number of parallel instances created
-        * @param waitTime
-        *            Max wait time for next record
-        */
-       public void addIterationHead(String vertexName, String iterationHead, 
Integer iterationID,
-                       int parallelism, long waitTime) {
-
-               addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
-
-               chaining = false;
-
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoHeadName.put(iterationID, vertexName);
-
-               setSerializersFrom(iterationHead, vertexName);
-
-               setEdge(vertexName, iterationHead, outPartitioning
-                               
.get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(),
-                               false);
-
-               iterationWaitTime.put(iterationIDtoHeadName.get(iterationID), 
waitTime);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SOURCE: {}", vertexName);
-               }
-
-               sources.add(vertexName);
-       }
-
-       /**
-        * Adds a vertex for the iteration tail to the {@link JobGraph}. The 
values
-        * intended to be iterated will be sent to this sink from the iteration
-        * head.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param iterationTail
-        *            Id of the iteration tail
-        * @param iterationID
-        *            ID of iteration for mulitple iterations
-        * @param parallelism
-        *            Number of parallel instances created
-        * @param waitTime
-        *            Max waiting time for next record
-        */
-       public void addIterationTail(String vertexName, String iterationTail, 
Integer iterationID,
-                       int parallelism, long waitTime) {
-
-               if (bufferTimeout.get(iterationTail) == 0) {
-                       throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
-               }
-
-               addVertex(vertexName, StreamIterationTail.class, null, null, 
parallelism);
-
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoTailName.put(iterationID, vertexName);
-
-               setSerializersFrom(iterationTail, vertexName);
-               iterationWaitTime.put(iterationIDtoTailName.get(iterationID), 
waitTime);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SINK: {}", vertexName);
-               }
-
-       }
-
-       public <IN1, IN2, OUT> void addCoTask(String vertexName,
-                       CoInvokable<IN1, IN2, OUT> taskInvokableObject, 
TypeInformation<IN1> in1TypeInfo,
-                       TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo,
-                       String operatorName, int parallelism) {
-
-               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
-
-               addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo),
-                               new StreamRecordSerializer<IN2>(in2TypeInfo), 
new StreamRecordSerializer<OUT>(
-                                               outTypeInfo), null);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("CO-TASK: {}", vertexName);
-               }
-       }
-
-       /**
-        * Sets vertex parameters in the JobGraph
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param vertexClass
-        *            The class of the vertex
-        * @param invokableObjectject
-        *            The user defined invokable object
-        * @param operatorName
-        *            Type of the user defined operator
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       private void addVertex(String vertexName, Class<? extends 
AbstractInvokable> vertexClass,
-                       StreamInvokable<?, ?> invokableObject, String 
operatorName, int parallelism) {
-
-               vertexClasses.put(vertexName, vertexClass);
-               setParallelism(vertexName, parallelism);
-               invokableObjects.put(vertexName, invokableObject);
-               operatorNames.put(vertexName, operatorName);
-               outEdgeList.put(vertexName, new ArrayList<String>());
-               outEdgeIndex.put(vertexName, new ArrayList<Integer>());
-               outEdgeNames.put(vertexName, new ArrayList<List<String>>());
-               outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>());
-               inEdgeList.put(vertexName, new ArrayList<String>());
-               outPartitioning.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
-               iterationTailCount.put(vertexName, 0);
-       }
-
-       private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
-                       StreamRecordSerializer<?> in2, 
StreamRecordSerializer<?> out1,
-                       StreamRecordSerializer<?> out2) {
-               typeSerializersIn1.put(vertexName, in1);
-               typeSerializersIn2.put(vertexName, in2);
-               typeSerializersOut1.put(vertexName, out1);
-               typeSerializersOut2.put(vertexName, out2);
-       }
-
-       private List<Tuple2<String, String>> createChain(String startNode, 
String current) {
-
-               if (!builtNodes.contains(startNode)) {
-
-                       List<Tuple2<String, String>> transitiveOutEdges = new 
ArrayList<Tuple2<String, String>>();
-                       List<String> chainableOutputs = new ArrayList<String>();
-                       List<String> nonChainableOutputs = new 
ArrayList<String>();
-
-                       for (String outName : outEdgeList.get(current)) {
-                               if (isChainable(current, outName)) {
-                                       chainableOutputs.add(outName);
-                               } else {
-                                       nonChainableOutputs.add(outName);
-                               }
-
-                       }
-
-                       for (String chainable : chainableOutputs) {
-                               
transitiveOutEdges.addAll(createChain(startNode, chainable));
-                       }
-
-                       for (String nonChainable : nonChainableOutputs) {
-                               transitiveOutEdges.add(new Tuple2<String, 
String>(current, nonChainable));
-                               
transitiveOutEdges.addAll(createChain(nonChainable, nonChainable));
-                       }
-
-                       StreamConfig config = current.equals(startNode) ? 
createProcessingVertex(startNode)
-                                       : new StreamConfig(new Configuration());
-
-                       setVertexConfig(current, config, chainableOutputs, 
nonChainableOutputs);
-
-                       if (current.equals(startNode)) {
-
-                               config.setChainStart();
-                               config.setRecordWriterOrder(transitiveOutEdges);
-
-                               for (Tuple2<String, String> edge : 
transitiveOutEdges) {
-                                       connect(startNode, edge);
-                               }
-
-                               
vertexConfigs.get(startNode).setTransitiveChainedTaskConfigs(
-                                               chainedConfigs.get(startNode));
-
-                       } else {
-
-                               Map<String, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
-
-                               if (chainedConfs == null) {
-                                       chainedConfigs.put(startNode, new 
HashMap<String, StreamConfig>());
-                               }
-                               chainedConfigs.get(startNode).put(current, 
config);
-                       }
-
-                       return transitiveOutEdges;
-
-               } else {
-                       return new ArrayList<Tuple2<String, String>>();
-               }
-       }
-
-       private StreamConfig createProcessingVertex(String vertexName) {
-
-               AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
-
-               this.jobGraph.addVertex(vertex);
-
-               int parallelism = vertexParallelism.get(vertexName);
-
-               vertex.setInvokableClass(vertexClasses.get(vertexName));
-               vertex.setParallelism(parallelism);
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Parallelism set: {} for {}", parallelism, 
vertexName);
-               }
-
-               if (inputFormatList.containsKey(vertexName)) {
-                       
vertex.setInputSplitSource(inputFormatList.get(vertexName));
-               }
-
-               streamVertices.put(vertexName, vertex);
-               builtNodes.add(vertexName);
-
-               return new StreamConfig(vertex.getConfiguration());
-       }
-
-       private void setVertexConfig(String vertexName, StreamConfig config,
-                       List<String> chainableOutputs, List<String> 
nonChainableOutputs) {
-
-               StreamInvokable<?, ?> invokableObject = 
invokableObjects.get(vertexName);
-               byte[] outputSelector = outputSelectors.get(vertexName);
-               Class<? extends AbstractInvokable> vertexClass = 
vertexClasses.get(vertexName);
-               Map<String, OperatorState<?>> state = 
operatorStates.get(vertexName);
-
-               config.setVertexName(vertexName);
-
-               config.setBufferTimeout(bufferTimeout.get(vertexName));
-
-               config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
-               config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
-               
config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
-               
config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
-
-               config.setUserInvokable(invokableObject);
-               config.setOutputSelector(outputSelector);
-               config.setOperatorStates(state);
-
-               config.setNumberOfOutputs(nonChainableOutputs.size());
-               config.setOutputs(nonChainableOutputs);
-               config.setChainedOutputs(chainableOutputs);
-
-               if (vertexClass.equals(StreamIterationHead.class)
-                               || 
vertexClass.equals(StreamIterationTail.class)) {
-                       config.setIterationId(iterationIds.get(vertexName));
-                       
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
-               }
-
-               vertexConfigs.put(vertexName, config);
-       }
-
-       private boolean isChainable(String vertexName, String outName) {
-               return inEdgeList.get(outName).size() == 1
-                               && invokableObjects.get(outName) != null
-                               && outputSelectors.get(vertexName) == null
-                               && 
invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS
-                               && 
(invokableObjects.get(vertexName).getChainingStrategy() == 
ChainingStrategy.HEAD || invokableObjects
-                                               
.get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS)
-                               && outPartitioning.get(vertexName)
-                                               
.get(outEdgeList.get(vertexName).indexOf(outName)).getStrategy() == 
PartitioningStrategy.FORWARD
-                               && vertexParallelism.get(vertexName) == 
vertexParallelism.get(outName) && chaining;
-       }
-
-       private <T> void connect(String headOfChain, Tuple2<String, String> 
edge) {
-
-               String upStreamVertexName = edge.f0;
-               String downStreamVertexName = edge.f1;
-
-               int outputIndex = 
outEdgeList.get(upStreamVertexName).indexOf(downStreamVertexName);
-
-               AbstractJobVertex headVertex = streamVertices.get(headOfChain);
-               AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamVertexName);
-
-               StreamConfig downStreamConfig = new 
StreamConfig(downStreamVertex.getConfiguration());
-               StreamConfig upStreamConfig = new 
StreamConfig(headVertex.getConfiguration());
-
-               List<Integer> outEdgeIndexList = 
outEdgeIndex.get(upStreamVertexName);
-               int numOfInputs = downStreamConfig.getNumberOfInputs();
-
-               downStreamConfig.setInputIndex(numOfInputs++, 
outEdgeIndexList.get(outputIndex));
-               downStreamConfig.setNumberOfInputs(numOfInputs);
-
-               StreamPartitioner<?> partitionerObject = 
outPartitioning.get(upStreamVertexName).get(
-                               outputIndex);
-
-               upStreamConfig.setPartitioner(downStreamVertexName, 
partitionerObject);
-
-               if (partitionerObject.getStrategy() == 
PartitioningStrategy.FORWARD) {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
-               } else {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.ALL_TO_ALL);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("CONNECTED: {} - {} -> {}", 
partitionerObject.getClass().getSimpleName(),
-                                       headOfChain, downStreamVertexName);
-               }
-
-               upStreamConfig.setOutputNames(downStreamVertexName, 
outEdgeNames.get(upStreamVertexName)
-                               .get(outputIndex));
-               upStreamConfig.setSelectAll(downStreamVertexName, 
outEdgeSelectAll.get(upStreamVertexName)
-                               .get(outputIndex));
-       }
-
-       /**
-        * Sets the number of parallel instances created for the given vertex.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       public void setParallelism(String vertexName, int parallelism) {
-               vertexParallelism.put(vertexName, parallelism);
-       }
-
-       /**
-        * Sets the input format for the given vertex.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param inputFormat
-        *            input format of the file source associated with the given
-        *            vertex
-        */
-       public void setInputFormat(String vertexName, InputFormat<String, ?> 
inputFormat) {
-               inputFormatList.put(vertexName, inputFormat);
-       }
-
-       public void setBufferTimeout(String vertexName, long bufferTimeout) {
-               this.bufferTimeout.put(vertexName, bufferTimeout);
-       }
-
-       public void addOperatorState(String veretxName, String stateName, 
OperatorState<?> state) {
-               Map<String, OperatorState<?>> states = 
operatorStates.get(veretxName);
-               if (states == null) {
-                       states = new HashMap<String, OperatorState<?>>();
-                       states.put(stateName, state);
-               } else {
-                       if (states.containsKey(stateName)) {
-                               throw new RuntimeException("State has already 
been registered with this name: "
-                                               + stateName);
-                       } else {
-                               states.put(stateName, state);
-                       }
-               }
-               operatorStates.put(veretxName, states);
-       }
-
-       /**
-        * Connects two vertices in the JobGraph using the selected partitioner
-        * settings
-        * 
-        * @param upStreamVertexName
-        *            Name of the upstream(output) vertex
-        * @param downStreamVertexName
-        *            Name of the downstream(input) vertex
-        * @param partitionerObject
-        *            Partitioner object
-        * @param typeNumber
-        *            Number of the type (used at co-functions)
-        * @param outputNames
-        *            User defined names of the out edge
-        */
-       public void setEdge(String upStreamVertexName, String 
downStreamVertexName,
-                       StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames,
-                       boolean selectAll) {
-               outEdgeList.get(upStreamVertexName).add(downStreamVertexName);
-               outEdgeIndex.get(upStreamVertexName).add(typeNumber);
-               inEdgeList.get(downStreamVertexName).add(upStreamVertexName);
-               outPartitioning.get(upStreamVertexName).add(partitionerObject);
-               outEdgeNames.get(upStreamVertexName).add(outputNames);
-               outEdgeSelectAll.get(upStreamVertexName).add(selectAll);
-       }
-
-       /**
-        * Sets the parallelism and buffertimeout of the iteration head of the 
given
-        * iteration id to the parallelism given.
-        * 
-        * @param iterationID
-        *            ID of the iteration
-        * @param iterationTail
-        *            ID of the iteration tail
-        */
-       public void setIterationSourceSettings(String iterationID, String 
iterationTail) {
-               setParallelism(iterationIDtoHeadName.get(iterationID), 
vertexParallelism.get(iterationTail));
-               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeout.get(iterationTail));
-       }
-
-       /**
-        * Sets a user defined {@link OutputSelector} for the given vertex. 
Used for
-        * directed emits.
-        * 
-        * @param vertexName
-        *            Name of the vertex for which the output selector will be 
set
-        * @param serializedOutputSelector
-        *            Byte array representing the serialized output selector.
-        */
-       public <T> void setOutputSelector(String vertexName, byte[] 
serializedOutputSelector) {
-               outputSelectors.put(vertexName, serializedOutputSelector);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Outputselector set for {}", vertexName);
-               }
-
-       }
-
-       public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> 
invokableObject) {
-               invokableObjects.put(id, invokableObject);
-       }
-
-       public StreamInvokable<?, ?> getInvokable(String id) {
-               return invokableObjects.get(id);
-       }
-
-       public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
-               StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType);
-               typeSerializersOut1.put(id, serializer);
-       }
-
-       /**
-        * Sets TypeSerializerWrapper from one vertex to another, used with some
-        * sinks.
-        * 
-        * @param from
-        *            from
-        * @param to
-        *            to
-        */
-       public void setSerializersFrom(String from, String to) {
-               operatorNames.put(to, operatorNames.get(from));
-
-               typeSerializersIn1.put(to, typeSerializersOut1.get(from));
-               typeSerializersIn2.put(to, typeSerializersOut2.get(from));
-               typeSerializersOut1.put(to, typeSerializersOut1.get(from));
-               typeSerializersOut2.put(to, typeSerializersOut2.get(from));
-       }
-
-       /**
-        * Sets slot sharing for the vertices.
-        */
-       private void setSlotSharing() {
-               SlotSharingGroup shareGroup = new SlotSharingGroup();
-
-               for (AbstractJobVertex vertex : streamVertices.values()) {
-                       vertex.setSlotSharingGroup(shareGroup);
-               }
-
-               for (Integer iterID : new 
HashSet<Integer>(iterationIds.values())) {
-                       CoLocationGroup ccg = new CoLocationGroup();
-                       AbstractJobVertex tail = 
streamVertices.get(iterationIDtoTailName.get(iterID));
-                       AbstractJobVertex head = 
streamVertices.get(iterationIDtoHeadName.get(iterID));
-
-                       ccg.addVertex(head);
-                       ccg.addVertex(tail);
-               }
-       }
-
-       /**
-        * Gets the assembled {@link JobGraph} and adds a default name for it.
-        */
-       public JobGraph getJobGraph() {
-               return getJobGraph(DEAFULT_JOB_NAME);
-       }
-
-       /**
-        * Gets the assembled {@link JobGraph} and adds a user specified name 
for
-        * it.
-        * 
-        * @param jobGraphName
-        *            name of the jobGraph
-        */
-       public JobGraph getJobGraph(String jobGraphName) {
-               jobGraph = new JobGraph(jobGraphName);
-               buildJobGraph();
-               return jobGraph;
-       }
-
-       /**
-        * Builds the {@link JobGraph} from the vertices with the edges and 
settings
-        * provided.
-        */
-       private void buildJobGraph() {
-
-               for (String sourceName : sources) {
-                       createChain(sourceName, sourceName);
-               }
-
-               setSlotSharing();
-       }
-
-       public void setChaining(boolean chaining) {
-               this.chaining = chaining;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 6fffaa6..213a892 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -62,7 +62,7 @@ public class StreamConfig implements Serializable {
        private static final String TYPE_SERIALIZER_OUT_2 = 
"typeSerializer_out_2";
        private static final String ITERATON_WAIT = "iterationWait";
        private static final String OUTPUTS = "outVertexNames";
-       private static final String RW_ORDER = "rwOrder";
+       private static final String EDGES_IN_ORDER = "rwOrder";
 
        // DEFAULT VALUES
 
@@ -186,10 +186,10 @@ public class StreamConfig implements Serializable {
                return config.getBoolean(DIRECTED_EMIT, false);
        }
 
-       public void setOutputSelector(byte[] outputSelector) {
+       public void setOutputSelector(OutputSelector<?> outputSelector) {
                if (outputSelector != null) {
                        setDirectedEmit(true);
-                       config.setBytes(OUTPUT_SELECTOR, outputSelector);
+                       config.setBytes(OUTPUT_SELECTOR, 
SerializationUtils.serialize(outputSelector));
                }
        }
 
@@ -293,20 +293,16 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       public void setRecordWriterOrder(List<Tuple2<String, String>> 
outEdgeList) {
+       public void setOutEdgesInOrder(List<Tuple2<String, String>> 
outEdgeList) {
 
-               List<String> outVertices = new ArrayList<String>();
-               for (Tuple2<String, String> edge : outEdgeList) {
-                       outVertices.add(edge.f1);
-               }
-
-               config.setBytes(RW_ORDER, 
SerializationUtils.serialize((Serializable) outVertices));
+               config.setBytes(EDGES_IN_ORDER, 
SerializationUtils.serialize((Serializable) outEdgeList));
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getRecordWriterOrder(ClassLoader cl) {
+       public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) {
                try {
-                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config, RW_ORDER, cl);
+                       return (List<Tuple2<String, String>>) 
InstantiationUtil.readObjectFromConfig(
+                                       this.config, EDGES_IN_ORDER, cl);
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
outputs.");
                }
@@ -358,8 +354,10 @@ public class StreamConfig implements Serializable {
        public Map<String, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
                try {
 
-                       return (Map<String, StreamConfig>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       CHAINED_TASK_CONFIG, cl);
+                       Map<String, StreamConfig> confs = (Map<String, 
StreamConfig>) InstantiationUtil
+                                       .readObjectFromConfig(this.config, 
CHAINED_TASK_CONFIG, cl);
+
+                       return confs == null ? new HashMap<String, 
StreamConfig>() : confs;
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
configuration.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
new file mode 100644
index 0000000..3dd1bdb
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
+import org.apache.flink.streaming.api.streamvertex.StreamVertex;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.state.OperatorState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Object for building Apache Flink stream processing graphs
+ */
+public class StreamGraph {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
+       private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
+
+       protected boolean chaining = true;
+
+       // Graph attributes
+       private Map<String, Integer> operatorParallelisms;
+       private Map<String, Long> bufferTimeouts;
+       private Map<String, List<String>> outEdgeLists;
+       private Map<String, List<Integer>> outEdgeTypes;
+       private Map<String, List<List<String>>> selectedNames;
+       private Map<String, List<Boolean>> outEdgeSelectAlls;
+       private Map<String, List<String>> inEdgeLists;
+       private Map<String, List<StreamPartitioner<?>>> outputPartitioners;
+       private Map<String, String> operatorNames;
+       private Map<String, StreamInvokable<?, ?>> invokableObjects;
+       private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
+       private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
+       private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
+       private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
+       private Map<String, OutputSelector<?>> outputSelectors;
+       private Map<String, Class<? extends AbstractInvokable>> 
jobVertexClasses;
+       private Map<String, Integer> iterationIds;
+       private Map<Integer, String> iterationIDtoHeadName;
+       private Map<Integer, String> iterationIDtoTailName;
+       private Map<String, Integer> iterationTailCount;
+       private Map<String, Long> iterationTimeouts;
+       private Map<String, Map<String, OperatorState<?>>> operatorStates;
+       private Map<String, InputFormat<String, ?>> inputFormatLists;
+
+       private Set<String> sources;
+
+       public StreamGraph() {
+
+               initGraph();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("StreamGraph created");
+               }
+       }
+
+       public void initGraph() {
+               operatorParallelisms = new HashMap<String, Integer>();
+               bufferTimeouts = new HashMap<String, Long>();
+               outEdgeLists = new HashMap<String, List<String>>();
+               outEdgeTypes = new HashMap<String, List<Integer>>();
+               selectedNames = new HashMap<String, List<List<String>>>();
+               outEdgeSelectAlls = new HashMap<String, List<Boolean>>();
+               inEdgeLists = new HashMap<String, List<String>>();
+               outputPartitioners = new HashMap<String, 
List<StreamPartitioner<?>>>();
+               operatorNames = new HashMap<String, String>();
+               invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
+               typeSerializersIn1 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               outputSelectors = new HashMap<String, OutputSelector<?>>();
+               jobVertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
+               iterationIds = new HashMap<String, Integer>();
+               iterationIDtoHeadName = new HashMap<Integer, String>();
+               iterationIDtoTailName = new HashMap<Integer, String>();
+               iterationTailCount = new HashMap<String, Integer>();
+               iterationTimeouts = new HashMap<String, Long>();
+               operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
+               inputFormatLists = new HashMap<String, InputFormat<String, 
?>>();
+               sources = new HashSet<String>();
+       }
+
+       /**
+        * Adds a vertex to the streaming graph with the given parameters
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param invokableObject
+        *            User defined operator
+        * @param inTypeInfo
+        *            Input type for serialization
+        * @param outTypeInfo
+        *            Output type for serialization
+        * @param operatorName
+        *            Operator type
+        * @param parallelism
+        *            Number of parallel instances created
+        */
+       public <IN, OUT> void addStreamVertex(String vertexName,
+                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
+                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
+
+               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
+
+               StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
+                               inTypeInfo) : null;
+               StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null 
? new StreamRecordSerializer<OUT>(
+                               outTypeInfo) : null;
+
+               addTypeSerializers(vertexName, inSerializer, null, 
outSerializer, null);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Vertex: {}", vertexName);
+               }
+       }
+
+       public <IN, OUT> void addSourceVertex(String vertexName,
+                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
+                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
+               addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
+                               parallelism);
+               sources.add(vertexName);
+       }
+
+       /**
+        * Adds a vertex for the iteration head to the {@link JobGraph}. The
+        * iterated values will be fed from this vertex back to the graph.
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param iterationHead
+        *            Id of the iteration head
+        * @param iterationID
+        *            ID of iteration for multiple iterations
+        * @param parallelism
+        *            Number of parallel instances created
+        * @param waitTime
+        *            Max wait time for next record
+        */
+       public void addIterationHead(String vertexName, String iterationHead, 
Integer iterationID,
+                       int parallelism, long waitTime) {
+
+               addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
+
+               chaining = false;
+
+               iterationIds.put(vertexName, iterationID);
+               iterationIDtoHeadName.put(iterationID, vertexName);
+
+               setSerializersFrom(iterationHead, vertexName);
+
+               setEdge(vertexName, iterationHead,
+                               
outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
+                               new ArrayList<String>(), false);
+
+               iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), 
waitTime);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("ITERATION SOURCE: {}", vertexName);
+               }
+
+               sources.add(vertexName);
+       }
+
+       /**
+        * Adds a vertex for the iteration tail to the {@link JobGraph}. The 
values
+        * intended to be iterated will be sent to this sink from the iteration
+        * head.
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param iterationTail
+        *            Id of the iteration tail
+        * @param iterationID
+        *            ID of iteration for mulitple iterations
+        * @param parallelism
+        *            Number of parallel instances created
+        * @param waitTime
+        *            Max waiting time for next record
+        */
+       public void addIterationTail(String vertexName, String iterationTail, 
Integer iterationID,
+                       int parallelism, long waitTime) {
+
+               if (bufferTimeouts.get(iterationTail) == 0) {
+                       throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
+               }
+
+               addVertex(vertexName, StreamIterationTail.class, null, null, 
parallelism);
+
+               iterationIds.put(vertexName, iterationID);
+               iterationIDtoTailName.put(iterationID, vertexName);
+
+               setSerializersFrom(iterationTail, vertexName);
+               iterationTimeouts.put(iterationIDtoTailName.get(iterationID), 
waitTime);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("ITERATION SINK: {}", vertexName);
+               }
+
+       }
+
+       public <IN1, IN2, OUT> void addCoTask(String vertexName,
+                       CoInvokable<IN1, IN2, OUT> taskInvokableObject, 
TypeInformation<IN1> in1TypeInfo,
+                       TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo,
+                       String operatorName, int parallelism) {
+
+               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
+
+               addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo),
+                               new StreamRecordSerializer<IN2>(in2TypeInfo), 
new StreamRecordSerializer<OUT>(
+                                               outTypeInfo), null);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("CO-TASK: {}", vertexName);
+               }
+       }
+
+       /**
+        * Sets vertex parameters in the JobGraph
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param vertexClass
+        *            The class of the vertex
+        * @param invokableObjectject
+        *            The user defined invokable object
+        * @param operatorName
+        *            Type of the user defined operator
+        * @param parallelism
+        *            Number of parallel instances created
+        */
+       private void addVertex(String vertexName, Class<? extends 
AbstractInvokable> vertexClass,
+                       StreamInvokable<?, ?> invokableObject, String 
operatorName, int parallelism) {
+
+               jobVertexClasses.put(vertexName, vertexClass);
+               setParallelism(vertexName, parallelism);
+               invokableObjects.put(vertexName, invokableObject);
+               operatorNames.put(vertexName, operatorName);
+               outEdgeLists.put(vertexName, new ArrayList<String>());
+               outEdgeTypes.put(vertexName, new ArrayList<Integer>());
+               selectedNames.put(vertexName, new ArrayList<List<String>>());
+               outEdgeSelectAlls.put(vertexName, new ArrayList<Boolean>());
+               inEdgeLists.put(vertexName, new ArrayList<String>());
+               outputPartitioners.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
+               iterationTailCount.put(vertexName, 0);
+       }
+
+       /**
+        * Connects two vertices in the JobGraph using the selected partitioner
+        * settings
+        * 
+        * @param upStreamVertexName
+        *            Name of the upstream(output) vertex
+        * @param downStreamVertexName
+        *            Name of the downstream(input) vertex
+        * @param partitionerObject
+        *            Partitioner object
+        * @param typeNumber
+        *            Number of the type (used at co-functions)
+        * @param outputNames
+        *            User defined names of the out edge
+        */
+       public void setEdge(String upStreamVertexName, String 
downStreamVertexName,
+                       StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames,
+                       boolean selectAll) {
+               outEdgeLists.get(upStreamVertexName).add(downStreamVertexName);
+               outEdgeTypes.get(upStreamVertexName).add(typeNumber);
+               inEdgeLists.get(downStreamVertexName).add(upStreamVertexName);
+               
outputPartitioners.get(upStreamVertexName).add(partitionerObject);
+               selectedNames.get(upStreamVertexName).add(outputNames);
+               outEdgeSelectAlls.get(upStreamVertexName).add(selectAll);
+       }
+
+       private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
+                       StreamRecordSerializer<?> in2, 
StreamRecordSerializer<?> out1,
+                       StreamRecordSerializer<?> out2) {
+               typeSerializersIn1.put(vertexName, in1);
+               typeSerializersIn2.put(vertexName, in2);
+               typeSerializersOut1.put(vertexName, out1);
+               typeSerializersOut2.put(vertexName, out2);
+       }
+
+       /**
+        * Sets the number of parallel instances created for the given vertex.
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param parallelism
+        *            Number of parallel instances created
+        */
+       public void setParallelism(String vertexName, int parallelism) {
+               operatorParallelisms.put(vertexName, parallelism);
+       }
+
+       public int getParallelism(String vertexName) {
+               return operatorParallelisms.get(vertexName);
+       }
+
+       /**
+        * Sets the input format for the given vertex.
+        * 
+        * @param vertexName
+        *            Name of the vertex
+        * @param inputFormat
+        *            input format of the file source associated with the given
+        *            vertex
+        */
+       public void setInputFormat(String vertexName, InputFormat<String, ?> 
inputFormat) {
+               inputFormatLists.put(vertexName, inputFormat);
+       }
+
+       public void setBufferTimeout(String vertexName, long bufferTimeout) {
+               this.bufferTimeouts.put(vertexName, bufferTimeout);
+       }
+
+       public long getBufferTimeout(String vertexName) {
+               return this.bufferTimeouts.get(vertexName);
+       }
+
+       public void addOperatorState(String veretxName, String stateName, 
OperatorState<?> state) {
+               Map<String, OperatorState<?>> states = 
operatorStates.get(veretxName);
+               if (states == null) {
+                       states = new HashMap<String, OperatorState<?>>();
+                       states.put(stateName, state);
+               } else {
+                       if (states.containsKey(stateName)) {
+                               throw new RuntimeException("State has already 
been registered with this name: "
+                                               + stateName);
+                       } else {
+                               states.put(stateName, state);
+                       }
+               }
+               operatorStates.put(veretxName, states);
+       }
+
+       /**
+        * Sets the parallelism and buffertimeout of the iteration head of the 
given
+        * iteration id to the parallelism given.
+        * 
+        * @param iterationID
+        *            ID of the iteration
+        * @param iterationTail
+        *            ID of the iteration tail
+        */
+       public void setIterationSourceSettings(String iterationID, String 
iterationTail) {
+               setParallelism(iterationIDtoHeadName.get(iterationID),
+                               operatorParallelisms.get(iterationTail));
+               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeouts.get(iterationTail));
+       }
+
+       /**
+        * Sets a user defined {@link OutputSelector} for the given operator. 
Used
+        * for directed emits.
+        * 
+        * @param vertexName
+        *            Name of the vertex for which the output selector will be 
set
+        * @param outputSelector
+        *            The outputselector object
+        */
+       public void setOutputSelector(String vertexName, OutputSelector<?> 
outputSelector) {
+               outputSelectors.put(vertexName, outputSelector);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Outputselector set for {}", vertexName);
+               }
+
+       }
+
+       public <IN, OUT> void setInvokable(String vertexName, 
StreamInvokable<IN, OUT> invokableObject) {
+               invokableObjects.put(vertexName, invokableObject);
+       }
+
+       public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
+               StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType);
+               typeSerializersOut1.put(id, serializer);
+       }
+
+       public StreamInvokable<?, ?> getInvokable(String vertexName) {
+               return invokableObjects.get(vertexName);
+       }
+
+       @SuppressWarnings("unchecked")
+       public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String 
vertexName) {
+               return (StreamRecordSerializer<OUT>) 
typeSerializersOut1.get(vertexName);
+       }
+
+       @SuppressWarnings("unchecked")
+       public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String 
vertexName) {
+               return (StreamRecordSerializer<OUT>) 
typeSerializersOut2.get(vertexName);
+       }
+
+       @SuppressWarnings("unchecked")
+       public <IN> StreamRecordSerializer<IN> getInSerializer1(String 
vertexName) {
+               return (StreamRecordSerializer<IN>) 
typeSerializersIn1.get(vertexName);
+       }
+
+       @SuppressWarnings("unchecked")
+       public <IN> StreamRecordSerializer<IN> getInSerializer2(String 
vertexName) {
+               return (StreamRecordSerializer<IN>) 
typeSerializersIn2.get(vertexName);
+       }
+
+       /**
+        * Sets TypeSerializerWrapper from one vertex to another, used with some
+        * sinks.
+        * 
+        * @param from
+        *            from
+        * @param to
+        *            to
+        */
+       public void setSerializersFrom(String from, String to) {
+               operatorNames.put(to, operatorNames.get(from));
+
+               typeSerializersIn1.put(to, typeSerializersOut1.get(from));
+               typeSerializersIn2.put(to, typeSerializersOut2.get(from));
+               typeSerializersOut1.put(to, typeSerializersOut1.get(from));
+               typeSerializersOut2.put(to, typeSerializersOut2.get(from));
+       }
+
+       /**
+        * Gets the assembled {@link JobGraph} and adds a default name for it.
+        */
+       public JobGraph getJobGraph() {
+               return getJobGraph(DEAFULT_JOB_NAME);
+       }
+
+       /**
+        * Gets the assembled {@link JobGraph} and adds a user specified name 
for
+        * it.
+        * 
+        * @param jobGraphName
+        *            name of the jobGraph
+        */
+       public JobGraph getJobGraph(String jobGraphName) {
+
+               StreamingJobGraphGenerator optimizer = new 
StreamingJobGraphGenerator(this);
+
+               return optimizer.createJobGraph(jobGraphName);
+       }
+
+       public void setChaining(boolean chaining) {
+               this.chaining = chaining;
+       }
+
+       public Collection<String> getSources() {
+               return sources;
+       }
+
+       public List<String> getOutEdges(String vertexName) {
+               return outEdgeLists.get(vertexName);
+       }
+
+       public List<String> getInEdges(String vertexName) {
+               return inEdgeLists.get(vertexName);
+       }
+
+       public List<Integer> getOutEdgeTypes(String vertexName) {
+
+               return outEdgeTypes.get(vertexName);
+       }
+
+       public StreamPartitioner<?> getOutPartitioner(String vertexName, int 
outputIndex) {
+               return outputPartitioners.get(vertexName).get(outputIndex);
+       }
+
+       public List<String> getSelectedNames(String vertexName, int 
outputIndex) {
+               return selectedNames.get(vertexName).get(outputIndex);
+       }
+
+       public Boolean isSelectAll(String vertexName, int outputIndex) {
+               return outEdgeSelectAlls.get(vertexName).get(outputIndex);
+       }
+
+       public Collection<Integer> getIterationIDs() {
+               return new HashSet<Integer>(iterationIds.values());
+       }
+
+       public String getIterationTail(int iterID) {
+               return iterationIDtoTailName.get(iterID);
+       }
+
+       public String getIterationHead(int iterID) {
+               return iterationIDtoHeadName.get(iterID);
+       }
+
+       public Class<? extends AbstractInvokable> getJobVertexClass(String 
vertexName) {
+               return jobVertexClasses.get(vertexName);
+       }
+
+       public InputFormat<String, ?> getInputFormat(String vertexName) {
+               return inputFormatLists.get(vertexName);
+       }
+
+       public OutputSelector<?> getOutputSelector(String vertexName) {
+               return outputSelectors.get(vertexName);
+       }
+
+       public Map<String, OperatorState<?>> getState(String vertexName) {
+               return operatorStates.get(vertexName);
+       }
+
+       public Integer getIterationID(String vertexName) {
+               return iterationIds.get(vertexName);
+       }
+
+       public long getIterationTimeout(String vertexName) {
+               return iterationTimeouts.get(vertexName);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
new file mode 100644
index 0000000..6c0cc20
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import 
org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingJobGraphGenerator {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
+
+       private StreamGraph streamGraph;
+
+       private Map<String, AbstractJobVertex> streamVertices;
+       private JobGraph jobGraph;
+       private Collection<String> builtNodes;
+
+       private Map<String, Map<String, StreamConfig>> chainedConfigs;
+       private Map<String, StreamConfig> vertexConfigs;
+       private Map<String, String> chainedNames;
+
+       public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+               this.streamGraph = streamGraph;
+       }
+
+       private void init() {
+               this.streamVertices = new HashMap<String, AbstractJobVertex>();
+               this.builtNodes = new HashSet<String>();
+               this.chainedConfigs = new HashMap<String, Map<String, 
StreamConfig>>();
+               this.vertexConfigs = new HashMap<String, StreamConfig>();
+               this.chainedNames = new HashMap<String, String>();
+       }
+
+       public JobGraph createJobGraph(String jobName) {
+               jobGraph = new JobGraph(jobName);
+               init();
+
+               for (String sourceName : streamGraph.getSources()) {
+                       createChain(sourceName, sourceName);
+               }
+
+               setSlotSharing();
+
+               return jobGraph;
+       }
+
+       private List<Tuple2<String, String>> createChain(String startNode, 
String current) {
+
+               if (!builtNodes.contains(startNode)) {
+
+                       List<Tuple2<String, String>> transitiveOutEdges = new 
ArrayList<Tuple2<String, String>>();
+                       List<String> chainableOutputs = new ArrayList<String>();
+                       List<String> nonChainableOutputs = new 
ArrayList<String>();
+
+                       for (String outName : streamGraph.getOutEdges(current)) 
{
+                               if (isChainable(current, outName)) {
+                                       chainableOutputs.add(outName);
+                               } else {
+                                       nonChainableOutputs.add(outName);
+                               }
+                       }
+
+                       for (String chainable : chainableOutputs) {
+                               
transitiveOutEdges.addAll(createChain(startNode, chainable));
+                       }
+
+                       for (String nonChainable : nonChainableOutputs) {
+                               transitiveOutEdges.add(new Tuple2<String, 
String>(current, nonChainable));
+                               createChain(nonChainable, nonChainable);
+                       }
+
+                       chainedNames.put(current, createChainedName(current, 
chainableOutputs));
+
+                       StreamConfig config = current.equals(startNode) ? 
createProcessingVertex(startNode)
+                                       : new StreamConfig(new Configuration());
+
+                       setVertexConfig(current, config, chainableOutputs, 
nonChainableOutputs);
+
+                       if (current.equals(startNode)) {
+
+                               config.setChainStart();
+                               config.setOutEdgesInOrder(transitiveOutEdges);
+
+                               for (Tuple2<String, String> edge : 
transitiveOutEdges) {
+                                       connect(startNode, edge);
+                               }
+
+                               
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
+
+                       } else {
+
+                               Map<String, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
+
+                               if (chainedConfs == null) {
+                                       chainedConfigs.put(startNode, new 
HashMap<String, StreamConfig>());
+                               }
+                               chainedConfigs.get(startNode).put(current, 
config);
+                       }
+
+                       return transitiveOutEdges;
+
+               } else {
+                       return new ArrayList<Tuple2<String, String>>();
+               }
+       }
+
+       private String createChainedName(String vertexName, List<String> 
chainedOutputs) {
+               if (chainedOutputs.size() > 1) {
+                       List<String> outputChainedNames = new 
ArrayList<String>();
+                       for (String chainable : chainedOutputs) {
+                               
outputChainedNames.add(chainedNames.get(chainable));
+                       }
+
+                       return vertexName + " -> (" + 
StringUtils.join(outputChainedNames, ", ") + ")";
+               } else if (chainedOutputs.size() == 1) {
+                       return vertexName + " -> " + 
chainedNames.get(chainedOutputs.get(0));
+               } else {
+                       return vertexName;
+               }
+
+       }
+
+       private StreamConfig createProcessingVertex(String vertexName) {
+
+               AbstractJobVertex vertex = new 
AbstractJobVertex(chainedNames.get(vertexName));
+
+               
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName));
+               vertex.setParallelism(streamGraph.getParallelism(vertexName));
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Parallelism set: {} for {}", 
streamGraph.getParallelism(vertexName),
+                                       vertexName);
+               }
+
+               if (streamGraph.getInputFormat(vertexName) != null) {
+                       
vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName));
+               }
+
+               streamVertices.put(vertexName, vertex);
+               builtNodes.add(vertexName);
+               jobGraph.addVertex(vertex);
+
+               return new StreamConfig(vertex.getConfiguration());
+       }
+
+       private void setVertexConfig(String vertexName, StreamConfig config,
+                       List<String> chainableOutputs, List<String> 
nonChainableOutputs) {
+
+               config.setVertexName(vertexName);
+               
config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName));
+
+               
config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName));
+               
config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName));
+               
config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName));
+               
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
+
+               config.setUserInvokable(streamGraph.getInvokable(vertexName));
+               
config.setOutputSelector(streamGraph.getOutputSelector(vertexName));
+               config.setOperatorStates(streamGraph.getState(vertexName));
+
+               config.setNumberOfOutputs(nonChainableOutputs.size());
+               config.setOutputs(nonChainableOutputs);
+               config.setChainedOutputs(chainableOutputs);
+
+               Class<? extends AbstractInvokable> vertexClass = 
streamGraph.getJobVertexClass(vertexName);
+
+               if (vertexClass.equals(StreamIterationHead.class)
+                               || 
vertexClass.equals(StreamIterationTail.class)) {
+                       
config.setIterationId(streamGraph.getIterationID(vertexName));
+                       
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName));
+               }
+
+               vertexConfigs.put(vertexName, config);
+       }
+
+       private <T> void connect(String headOfChain, Tuple2<String, String> 
edge) {
+
+               String upStreamVertexName = edge.f0;
+               String downStreamVertexName = edge.f1;
+
+               int outputIndex = 
streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName);
+
+               AbstractJobVertex headVertex = streamVertices.get(headOfChain);
+               AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamVertexName);
+
+               StreamConfig downStreamConfig = new 
StreamConfig(downStreamVertex.getConfiguration());
+               StreamConfig upStreamConfig = new 
StreamConfig(headVertex.getConfiguration());
+
+               List<Integer> outEdgeIndexList = 
streamGraph.getOutEdgeTypes(upStreamVertexName);
+               int numOfInputs = downStreamConfig.getNumberOfInputs();
+
+               downStreamConfig.setInputIndex(numOfInputs++, 
outEdgeIndexList.get(outputIndex));
+               downStreamConfig.setNumberOfInputs(numOfInputs);
+
+               StreamPartitioner<?> partitioner = 
streamGraph.getOutPartitioner(upStreamVertexName,
+                               outputIndex);
+
+               upStreamConfig.setPartitioner(downStreamVertexName, 
partitioner);
+
+               if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
+                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
+               } else {
+                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.ALL_TO_ALL);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("CONNECTED: {} - {} -> {}", 
partitioner.getClass().getSimpleName(),
+                                       headOfChain, downStreamVertexName);
+               }
+
+               upStreamConfig.setOutputNames(downStreamVertexName,
+                               
streamGraph.getSelectedNames(upStreamVertexName, outputIndex));
+               upStreamConfig.setSelectAll(downStreamVertexName,
+                               streamGraph.isSelectAll(upStreamVertexName, 
outputIndex));
+       }
+
+       private boolean isChainable(String vertexName, String outName) {
+
+               StreamInvokable<?, ?> headInvokable = 
streamGraph.getInvokable(vertexName);
+               StreamInvokable<?, ?> outInvokable = 
streamGraph.getInvokable(outName);
+
+               return streamGraph.getInEdges(outName).size() == 1
+                               && outInvokable != null
+                               && streamGraph.getOutputSelector(vertexName) == 
null
+                               && outInvokable.getChainingStrategy() == 
ChainingStrategy.ALWAYS
+                               && (headInvokable.getChainingStrategy() == 
ChainingStrategy.HEAD || headInvokable
+                                               .getChainingStrategy() == 
ChainingStrategy.ALWAYS)
+                               && streamGraph.getOutPartitioner(vertexName,
+                                               
streamGraph.getOutEdges(vertexName).indexOf(outName)).getStrategy() == 
PartitioningStrategy.FORWARD
+                               && streamGraph.getParallelism(vertexName) == 
streamGraph.getParallelism(outName)
+                               && streamGraph.chaining;
+       }
+
+       private void setSlotSharing() {
+               SlotSharingGroup shareGroup = new SlotSharingGroup();
+
+               for (AbstractJobVertex vertex : streamVertices.values()) {
+                       vertex.setSlotSharingGroup(shareGroup);
+               }
+
+               for (Integer iterID : streamGraph.getIterationIDs()) {
+                       CoLocationGroup ccg = new CoLocationGroup();
+                       AbstractJobVertex tail = 
streamVertices.get(streamGraph.getIterationTail(iterID));
+                       AbstractJobVertex head = 
streamVertices.get(streamGraph.getIterationHead(iterID));
+
+                       ccg.addVertex(head);
+                       ccg.addVertex(tail);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 80ea970..db8649b 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.StreamGraph;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
@@ -52,7 +52,7 @@ import 
org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 public class ConnectedDataStream<IN1, IN2> {
 
        protected StreamExecutionEnvironment environment;
-       protected JobGraphBuilder jobGraphBuilder;
+       protected StreamGraph jobGraphBuilder;
        protected DataStream<IN1> dataStream1;
        protected DataStream<IN2> dataStream2;
 
@@ -61,7 +61,7 @@ public class ConnectedDataStream<IN1, IN2> {
        protected KeySelector<IN2, ?> keySelector2;
 
        protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> 
input2) {
-               this.jobGraphBuilder = input1.jobGraphBuilder;
+               this.jobGraphBuilder = input1.streamGraph;
                this.environment = input1.environment;
                this.dataStream1 = input1.copy();
                this.dataStream2 = input2.copy();
@@ -402,7 +402,7 @@ public class ConnectedDataStream<IN1, IN2> {
                SingleOutputStreamOperator<OUT, ?> returnStream = new 
SingleOutputStreamOperator(
                                environment, functionName, outTypeInfo, 
functionInvokable);
 
-               dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), 
functionInvokable,
+               dataStream1.streamGraph.addCoTask(returnStream.getId(), 
functionInvokable,
                                getInputType1(), getInputType2(), outTypeInfo, 
functionName,
                                environment.getDegreeOfParallelism());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f68ab68..8e7d823 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -43,7 +43,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.StreamGraph;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
 import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -102,7 +102,7 @@ public class DataStream<OUT> {
        protected TypeInformation typeInfo;
        protected List<DataStream<OUT>> mergedStreams;
 
-       protected final JobGraphBuilder jobGraphBuilder;
+       protected final StreamGraph streamGraph;
 
        /**
         * Create a new {@link DataStream} in the given execution environment 
with
@@ -125,7 +125,7 @@ public class DataStream<OUT> {
                this.id = operatorType + "-" + counter.toString();
                this.environment = environment;
                this.degreeOfParallelism = environment.getDegreeOfParallelism();
-               this.jobGraphBuilder = environment.getJobGraphBuilder();
+               this.streamGraph = environment.getStreamGraph();
                this.userDefinedNames = new ArrayList<String>();
                this.selectAll = true;
                this.partitioner = new DistributePartitioner<OUT>(true);
@@ -147,7 +147,7 @@ public class DataStream<OUT> {
                this.userDefinedNames = new 
ArrayList<String>(dataStream.userDefinedNames);
                this.selectAll = dataStream.selectAll;
                this.partitioner = dataStream.partitioner;
-               this.jobGraphBuilder = dataStream.jobGraphBuilder;
+               this.streamGraph = dataStream.streamGraph;
                this.typeInfo = dataStream.typeInfo;
                this.mergedStreams = new ArrayList<DataStream<OUT>>();
                this.mergedStreams.add(this);
@@ -189,7 +189,7 @@ public class DataStream<OUT> {
 
        @SuppressWarnings("unchecked")
        public <R> DataStream<R> setType(TypeInformation<R> outType) {
-               jobGraphBuilder.setOutType(id, outType);
+               streamGraph.setOutType(id, outType);
                typeInfo = outType;
                return (DataStream<R>) this;
        }
@@ -1085,7 +1085,7 @@ public class DataStream<OUT> {
                DataStream<R> returnStream = new 
DataStreamSource<R>(environment, "iterationSource", null,
                                null, true);
 
-               jobGraphBuilder.addIterationHead(returnStream.getId(), 
this.getId(), iterationID,
+               streamGraph.addIterationHead(returnStream.getId(), 
this.getId(), iterationID,
                                degreeOfParallelism, waitTime);
 
                return this.copy();
@@ -1112,7 +1112,7 @@ public class DataStream<OUT> {
                SingleOutputStreamOperator<R, ?> returnStream = new 
SingleOutputStreamOperator(environment,
                                operatorName, outTypeInfo, invokable);
 
-               jobGraphBuilder.addStreamVertex(returnStream.getId(), 
invokable, getType(), outTypeInfo,
+               streamGraph.addStreamVertex(returnStream.getId(), invokable, 
getType(), outTypeInfo,
                                operatorName, degreeOfParallelism);
 
                connectGraph(inputStream, returnStream.getId(), 0);
@@ -1157,7 +1157,7 @@ public class DataStream<OUT> {
         */
        protected <X> void connectGraph(DataStream<X> inputStream, String 
outputID, int typeNumber) {
                for (DataStream<X> stream : inputStream.mergedStreams) {
-                       jobGraphBuilder.setEdge(stream.getId(), outputID, 
stream.partitioner, typeNumber,
+                       streamGraph.setEdge(stream.getId(), outputID, 
stream.partitioner, typeNumber,
                                        inputStream.userDefinedNames, 
inputStream.selectAll);
                }
 
@@ -1179,7 +1179,7 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType(),
                                sinkInvokable);
 
-               jobGraphBuilder.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null,
+               streamGraph.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null,
                                "sink", degreeOfParallelism);
 
                this.connectGraph(this.copy(), returnStream.getId(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index c4cd1e1..6f66b2c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -64,10 +64,10 @@ public class IterativeDataStream<IN> extends
                DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "iterationSink", null,
                                null);
 
-               jobGraphBuilder.addIterationTail(iterationSink.getId(), 
iterationTail.getId(), iterationID,
+               streamGraph.addIterationTail(iterationSink.getId(), 
iterationTail.getId(), iterationID,
                                iterationTail.getParallelism(), waitTime);
 
-               
jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), 
iterationTail.getId());
+               streamGraph.setIterationSourceSettings(iterationID.toString(), 
iterationTail.getId());
                connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
                return iterationTail;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index b8ca42c..dbfbc48 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.datastream;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.collector.OutputSelector;
@@ -65,7 +63,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
 
        @SuppressWarnings("unchecked")
        public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> 
outType) {
-               jobGraphBuilder.setOutType(id, outType);
+               streamGraph.setOutType(id, outType);
                typeInfo = outType;
                return (SingleOutputStreamOperator<R, ?>) this;
        }
@@ -84,7 +82,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
                }
                this.degreeOfParallelism = dop;
 
-               jobGraphBuilder.setParallelism(id, degreeOfParallelism);
+               streamGraph.setParallelism(id, degreeOfParallelism);
 
                return this;
        }
@@ -98,7 +96,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
         * @return The operator with buffer timeout set.
         */
        public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long 
timeoutMillis) {
-               jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
+               streamGraph.setBufferTimeout(id, timeoutMillis);
                return this;
        }
 
@@ -115,13 +113,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
        public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
                if (!isSplit) {
                        this.isSplit = true;
-                       try {
-                               jobGraphBuilder.setOutputSelector(id,
-                                               
SerializationUtils.serialize(clean(outputSelector)));
-
-                       } catch (SerializationException e) {
-                               throw new RuntimeException("Cannot serialize 
OutputSelector");
-                       }
+                       streamGraph.setOutputSelector(id, 
clean(outputSelector));
 
                        return new SplitDataStream<OUT>(this);
                } else {
@@ -144,7 +136,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
         * @return The data stream with state registered.
         */
        protected SingleOutputStreamOperator<OUT, O> registerState(String name, 
OperatorState<?> state) {
-               jobGraphBuilder.addOperatorState(getId(), name, state);
+               streamGraph.addOperatorState(getId(), name, state);
                return this;
        }
 
@@ -161,7 +153,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
         */
        protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, 
OperatorState<?>> states) {
                for (Entry<String, OperatorState<?>> entry : states.entrySet()) 
{
-                       jobGraphBuilder.addOperatorState(getId(), 
entry.getKey(), entry.getValue());
+                       streamGraph.addOperatorState(getId(), entry.getKey(), 
entry.getValue());
                }
 
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
index 8422400..03160c2 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -67,7 +67,7 @@ public class StreamCrossOperator<I1, I2> extends
 
                @SuppressWarnings("unchecked")
                public CrossWindow<I1, I2> every(long length) {
-                       ((CoWindowInvokable<I1, I2, ?>) 
jobGraphBuilder.getInvokable(id)).setSlideSize(length);
+                       ((CoWindowInvokable<I1, I2, ?>) 
streamGraph.getInvokable(id)).setSlideSize(length);
                        return this;
                }
 
@@ -90,7 +90,7 @@ public class StreamCrossOperator<I1, I2> extends
                                        new CrossWindowFunction<I1, I2, 
R>(clean(function)), op.windowSize,
                                        op.slideInterval, op.timeStamp1, 
op.timeStamp2);
 
-                       jobGraphBuilder.setInvokable(id, invokable);
+                       streamGraph.setInvokable(id, invokable);
 
                        return setType(outTypeInfo);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
index 626b9f1..d2b2032 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -245,7 +245,7 @@ public class StreamJoinOperator<I1, I2> extends
                                        getJoinWindowFunction(joinFunction, 
predicate), predicate.op.windowSize,
                                        predicate.op.slideInterval, 
predicate.op.timeStamp1, predicate.op.timeStamp2);
 
-                       jobGraphBuilder.setInvokable(id, invokable);
+                       streamGraph.setInvokable(id, invokable);
 
                        return setType(outType);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 3e44012..4824fca 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -29,7 +29,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @Override
        public void execute() throws Exception {
-               
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), 
getDegreeOfParallelism());
+               ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), 
getDegreeOfParallelism());
        }
 
        /**
@@ -41,7 +41,7 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @Override
        public void execute(String jobName) throws Exception {
-               
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+               
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
                                getDegreeOfParallelism());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index d833c8e..2eb05ad 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -81,14 +81,14 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
        @Override
        public void execute() {
 
-               JobGraph jobGraph = jobGraphBuilder.getJobGraph();
+               JobGraph jobGraph = streamGraph.getJobGraph();
                executeRemotely(jobGraph);
        }
 
        @Override
        public void execute(String jobName) {
 
-               JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+               JobGraph jobGraph = streamGraph.getJobGraph(jobName);
                executeRemotely(jobGraph);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index c157435..a9a5bd3 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -54,9 +54,9 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 
                JobGraph jobGraph;
                if (jobName == null) {
-                       jobGraph = this.jobGraphBuilder.getJobGraph();
+                       jobGraph = this.streamGraph.getJobGraph();
                } else {
-                       jobGraph = this.jobGraphBuilder.getJobGraph(jobName);
+                       jobGraph = this.streamGraph.getJobGraph(jobName);
                }
 
                for (File file : jars) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4194864..51dc0ae 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.StreamGraph;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
@@ -61,7 +61,7 @@ public abstract class StreamExecutionEnvironment {
 
        private ExecutionConfig config = new ExecutionConfig();
 
-       protected JobGraphBuilder jobGraphBuilder;
+       protected StreamGraph streamGraph;
 
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
@@ -71,7 +71,7 @@ public abstract class StreamExecutionEnvironment {
         * Constructor for creating StreamExecutionEnvironment
         */
        protected StreamExecutionEnvironment() {
-               jobGraphBuilder = new JobGraphBuilder();
+               streamGraph = new StreamGraph();
        }
 
        /**
@@ -352,7 +352,7 @@ public abstract class StreamExecutionEnvironment {
                        TypeInformation<String> typeInfo) {
                FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
                DataStreamSource<String> returnStream = addSource(function, 
null, "fileSource");
-               jobGraphBuilder.setInputFormat(returnStream.getId(), 
inputFormat);
+               streamGraph.setInputFormat(returnStream.getId(), inputFormat);
                return returnStream;
        }
 
@@ -437,7 +437,7 @@ public abstract class StreamExecutionEnvironment {
                DataStreamSource<OUT> returnStream = new 
DataStreamSource<OUT>(this, sourceName,
                                outTypeInfo, sourceInvokable, isParallel);
 
-               jobGraphBuilder.addSourceVertex(returnStream.getId(), 
sourceInvokable, null, outTypeInfo,
+               streamGraph.addSourceVertex(returnStream.getId(), 
sourceInvokable, null, outTypeInfo,
                                sourceName, dop);
 
                return returnStream;
@@ -584,12 +584,12 @@ public abstract class StreamExecutionEnvironment {
        public abstract void execute(String jobName) throws Exception;
 
        /**
-        * Getter of the {@link JobGraphBuilder} of the streaming job.
+        * Getter of the {@link StreamGraph} of the streaming job.
         * 
-        * @return jobGraphBuilder
+        * @return The streamgraph representing the transformations
         */
-       public JobGraphBuilder getJobGraphBuilder() {
-               return jobGraphBuilder;
+       public StreamGraph getStreamGraph() {
+               return streamGraph;
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index 01c0545..c2177fa 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -34,7 +34,6 @@ public class GroupedReduceInvokable<IN> extends 
StreamReduceInvokable<IN> {
                super(reducer);
                this.keySelector = keySelector;
                values = new HashMap<Object, IN>();
-               setChainingStrategy(ChainingStrategy.NEVER);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index bbd7b0c..a46fa96 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -67,7 +67,7 @@ import 
org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  * method. Fake elements created on prenotification will be forwarded to all
  * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
  * it forwards/distributed calls all groups.
- *
+ * 
  * @param <IN>
  *            The type of input elements handled by this operator invokable.
  */
@@ -154,7 +154,6 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                        LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) 
{
 
                super(userFunction);
-               setChainingStrategy(ChainingStrategy.NEVER);
 
                this.keySelector = keySelector;
 

Reply via email to