Repository: flink
Updated Branches:
  refs/heads/master bed3da4a6 -> a911559a1


[FLINK-1592] [streaming] StreamGraph refactor to store vertex IDs as Integers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a911559a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a911559a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a911559a

Branch: refs/heads/master
Commit: a911559a1d8d8d160edcdb76cd16812de80b0d1c
Parents: bed3da4
Author: Gyula Fora <[email protected]>
Authored: Sat Feb 21 14:23:04 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Sat Feb 21 14:23:04 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  52 +--
 .../apache/flink/streaming/api/StreamGraph.java | 373 +++++++++----------
 .../api/StreamingJobGraphGenerator.java         | 152 ++++----
 .../flink/streaming/api/WindowingOptimzier.java |  50 +--
 .../streaming/api/datastream/DataStream.java    |  12 +-
 .../api/datastream/DiscretizedStream.java       |  42 ++-
 .../api/datastream/WindowedDataStream.java      |   9 +-
 .../api/streamvertex/OutputHandler.java         |  20 +-
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../windowing/WindowIntegrationTest.java        |  24 ++
 10 files changed, 384 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 658b742..efc0b8b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -47,7 +47,7 @@ public class StreamConfig implements Serializable {
        private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
        private static final String OUTPUT_NAME = "outputName_";
        private static final String PARTITIONER_OBJECT = "partitionerObject_";
-       private static final String VERTEX_NAME = "vertexName";
+       private static final String VERTEX_NAME = "vertexID";
        private static final String ITERATION_ID = "iteration-id";
        private static final String OUTPUT_SELECTOR = "outputSelector";
        private static final String DIRECTED_EMIT = "directedEmit";
@@ -60,7 +60,7 @@ public class StreamConfig implements Serializable {
        private static final String TYPE_SERIALIZER_OUT_1 = 
"typeSerializer_out_1";
        private static final String TYPE_SERIALIZER_OUT_2 = 
"typeSerializer_out_2";
        private static final String ITERATON_WAIT = "iterationWait";
-       private static final String OUTPUTS = "outVertexNames";
+       private static final String OUTPUTS = "outvertexIDs";
        private static final String EDGES_IN_ORDER = "rwOrder";
 
        // DEFAULT VALUES
@@ -79,12 +79,12 @@ public class StreamConfig implements Serializable {
                return config;
        }
 
-       public void setVertexName(String vertexName) {
-               config.setString(VERTEX_NAME, vertexName);
+       public void setVertexID(Integer vertexID) {
+               config.setInteger(VERTEX_NAME, vertexID);
        }
 
-       public String getTaskName() {
-               return config.getString(VERTEX_NAME, "Missing");
+       public Integer getVertexID() {
+               return config.getInteger(VERTEX_NAME, -1);
        }
 
        public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
@@ -223,14 +223,14 @@ public class StreamConfig implements Serializable {
                return config.getLong(ITERATON_WAIT, 0);
        }
 
-       public <T> void setPartitioner(String output, StreamPartitioner<T> 
partitionerObject) {
+       public <T> void setPartitioner(Integer output, StreamPartitioner<T> 
partitionerObject) {
 
                config.setBytes(PARTITIONER_OBJECT + output,
                                
SerializationUtils.serialize(partitionerObject));
        }
 
        @SuppressWarnings("unchecked")
-       public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String 
output) {
+       public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, Integer 
output) {
                StreamPartitioner<T> partitioner = null;
                try {
                        partitioner = (StreamPartitioner<T>) 
InstantiationUtil.readObjectFromConfig(
@@ -241,7 +241,7 @@ public class StreamConfig implements Serializable {
                return partitioner;
        }
 
-       public void setSelectedNames(String output, List<String> selected) {
+       public void setSelectedNames(Integer output, List<String> selected) {
                if (selected != null) {
                        config.setBytes(OUTPUT_NAME + output,
                                        
SerializationUtils.serialize((Serializable) selected));
@@ -252,7 +252,7 @@ public class StreamConfig implements Serializable {
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getSelectedNames(String output) {
+       public List<String> getSelectedNames(Integer output) {
                return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
                                null));
        }
@@ -273,28 +273,28 @@ public class StreamConfig implements Serializable {
                return config.getInteger(NUMBER_OF_OUTPUTS, 0);
        }
 
-       public void setOutputs(List<String> outputVertexNames) {
-               config.setBytes(OUTPUTS, 
SerializationUtils.serialize((Serializable) outputVertexNames));
+       public void setOutputs(List<Integer> outputvertexIDs) {
+               config.setBytes(OUTPUTS, 
SerializationUtils.serialize((Serializable) outputvertexIDs));
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getOutputs(ClassLoader cl) {
+       public List<Integer> getOutputs(ClassLoader cl) {
                try {
-                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+                       return (List<Integer>) 
InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
outputs.");
                }
        }
 
-       public void setOutEdgesInOrder(List<Tuple2<String, String>> 
outEdgeList) {
+       public void setOutEdgesInOrder(List<Tuple2<Integer, Integer>> 
outEdgeList) {
 
                config.setBytes(EDGES_IN_ORDER, 
SerializationUtils.serialize((Serializable) outEdgeList));
        }
 
        @SuppressWarnings("unchecked")
-       public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) {
+       public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader 
cl) {
                try {
-                       return (List<Tuple2<String, String>>) 
InstantiationUtil.readObjectFromConfig(
+                       return (List<Tuple2<Integer, Integer>>) 
InstantiationUtil.readObjectFromConfig(
                                        this.config, EDGES_IN_ORDER, cl);
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
outputs.");
@@ -323,34 +323,34 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       public void setChainedOutputs(List<String> chainedOutputs) {
+       public void setChainedOutputs(List<Integer> chainedOutputs) {
                config.setBytes(CHAINED_OUTPUTS,
                                SerializationUtils.serialize((Serializable) 
chainedOutputs));
        }
 
        @SuppressWarnings("unchecked")
-       public List<String> getChainedOutputs(ClassLoader cl) {
+       public List<Integer> getChainedOutputs(ClassLoader cl) {
                try {
-                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config,
+                       return (List<Integer>) 
InstantiationUtil.readObjectFromConfig(this.config,
                                        CHAINED_OUTPUTS, cl);
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
chained outputs.");
                }
        }
 
-       public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> 
chainedTaskConfigs) {
+       public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> 
chainedTaskConfigs) {
                config.setBytes(CHAINED_TASK_CONFIG,
                                SerializationUtils.serialize((Serializable) 
chainedTaskConfigs));
        }
 
        @SuppressWarnings("unchecked")
-       public Map<String, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
+       public Map<Integer, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
                try {
 
-                       Map<String, StreamConfig> confs = (Map<String, 
StreamConfig>) InstantiationUtil
+                       Map<Integer, StreamConfig> confs = (Map<Integer, 
StreamConfig>) InstantiationUtil
                                        .readObjectFromConfig(this.config, 
CHAINED_TASK_CONFIG, cl);
 
-                       return confs == null ? new HashMap<String, 
StreamConfig>() : confs;
+                       return confs == null ? new HashMap<Integer, 
StreamConfig>() : confs;
                } catch (Exception e) {
                        throw new RuntimeException("Could not instantiate 
configuration.");
                }
@@ -373,12 +373,12 @@ public class StreamConfig implements Serializable {
                builder.append("\n=======================");
                builder.append("Stream Config");
                builder.append("=======================");
-               builder.append("\nTask name: " + getTaskName());
+               builder.append("\nTask name: " + getVertexID());
                builder.append("\nNumber of non-chained inputs: " + 
getNumberOfInputs());
                builder.append("\nNumber of non-chained outputs: " + 
getNumberOfOutputs());
                builder.append("\nOutput names: " + getOutputs(cl));
                builder.append("\nPartitioning:");
-               for (String outputname : getOutputs(cl)) {
+               for (Integer outputname : getOutputs(cl)) {
                        builder.append("\n\t" + outputname + ": " + 
getPartitioner(cl, outputname));
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 8e2b49e..82cd954 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -65,31 +65,31 @@ public class StreamGraph extends StreamingPlan {
        private String jobName = DEAFULT_JOB_NAME;
 
        // Graph attributes
-       private Map<String, Integer> operatorParallelisms;
-       private Map<String, Long> bufferTimeouts;
-       private Map<String, List<String>> outEdgeLists;
-       private Map<String, List<Integer>> outEdgeTypes;
-       private Map<String, List<List<String>>> selectedNames;
-       private Map<String, List<String>> inEdgeLists;
-       private Map<String, List<StreamPartitioner<?>>> outputPartitioners;
-       private Map<String, String> operatorNames;
-       private Map<String, StreamInvokable<?, ?>> invokableObjects;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-       private Map<String, Class<? extends AbstractInvokable>> 
jobVertexClasses;
-       private Map<String, List<OutputSelector<?>>> outputSelectors;
-       private Map<String, Integer> iterationIds;
-       private Map<Integer, String> iterationIDtoHeadName;
-       private Map<Integer, String> iterationIDtoTailName;
-       private Map<String, Integer> iterationTailCount;
-       private Map<String, Long> iterationTimeouts;
-       private Map<String, Map<String, OperatorState<?>>> operatorStates;
-       private Map<String, InputFormat<String, ?>> inputFormatLists;
-       private List<Map<String, ?>> containingMaps;
-
-       private Set<String> sources;
+       private Map<Integer, Integer> operatorParallelisms;
+       private Map<Integer, Long> bufferTimeouts;
+       private Map<Integer, List<Integer>> outEdgeLists;
+       private Map<Integer, List<Integer>> outEdgeTypes;
+       private Map<Integer, List<List<String>>> selectedNames;
+       private Map<Integer, List<Integer>> inEdgeLists;
+       private Map<Integer, List<StreamPartitioner<?>>> outputPartitioners;
+       private Map<Integer, String> operatorNames;
+       private Map<Integer, StreamInvokable<?, ?>> invokableObjects;
+       private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn1;
+       private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn2;
+       private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut1;
+       private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut2;
+       private Map<Integer, Class<? extends AbstractInvokable>> 
jobVertexClasses;
+       private Map<Integer, List<OutputSelector<?>>> outputSelectors;
+       private Map<Integer, Integer> iterationIds;
+       private Map<Integer, Integer> iterationIDtoHeadID;
+       private Map<Integer, Integer> iterationIDtoTailID;
+       private Map<Integer, Integer> iterationTailCount;
+       private Map<Integer, Long> iterationTimeouts;
+       private Map<Integer, Map<String, OperatorState<?>>> operatorStates;
+       private Map<Integer, InputFormat<String, ?>> inputFormatLists;
+       private List<Map<Integer, ?>> containingMaps;
+
+       private Set<Integer> sources;
 
        private ExecutionConfig executionConfig;
 
@@ -105,58 +105,58 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public void initGraph() {
-               containingMaps = new ArrayList<Map<String, ?>>();
+               containingMaps = new ArrayList<Map<Integer, ?>>();
 
-               operatorParallelisms = new HashMap<String, Integer>();
+               operatorParallelisms = new HashMap<Integer, Integer>();
                containingMaps.add(operatorParallelisms);
-               bufferTimeouts = new HashMap<String, Long>();
+               bufferTimeouts = new HashMap<Integer, Long>();
                containingMaps.add(bufferTimeouts);
-               outEdgeLists = new HashMap<String, List<String>>();
+               outEdgeLists = new HashMap<Integer, List<Integer>>();
                containingMaps.add(outEdgeLists);
-               outEdgeTypes = new HashMap<String, List<Integer>>();
+               outEdgeTypes = new HashMap<Integer, List<Integer>>();
                containingMaps.add(outEdgeTypes);
-               selectedNames = new HashMap<String, List<List<String>>>();
+               selectedNames = new HashMap<Integer, List<List<String>>>();
                containingMaps.add(selectedNames);
-               inEdgeLists = new HashMap<String, List<String>>();
+               inEdgeLists = new HashMap<Integer, List<Integer>>();
                containingMaps.add(inEdgeLists);
-               outputPartitioners = new HashMap<String, 
List<StreamPartitioner<?>>>();
+               outputPartitioners = new HashMap<Integer, 
List<StreamPartitioner<?>>>();
                containingMaps.add(outputPartitioners);
-               operatorNames = new HashMap<String, String>();
+               operatorNames = new HashMap<Integer, String>();
                containingMaps.add(operatorNames);
-               invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
+               invokableObjects = new HashMap<Integer, StreamInvokable<?, 
?>>();
                containingMaps.add(invokableObjects);
-               typeSerializersIn1 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersIn1 = new HashMap<Integer, 
StreamRecordSerializer<?>>();
                containingMaps.add(typeSerializersIn1);
-               typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersIn2 = new HashMap<Integer, 
StreamRecordSerializer<?>>();
                containingMaps.add(typeSerializersIn2);
-               typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersOut1 = new HashMap<Integer, 
StreamRecordSerializer<?>>();
                containingMaps.add(typeSerializersOut1);
-               typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
+               typeSerializersOut2 = new HashMap<Integer, 
StreamRecordSerializer<?>>();
                containingMaps.add(typeSerializersOut1);
-               outputSelectors = new HashMap<String, 
List<OutputSelector<?>>>();
+               outputSelectors = new HashMap<Integer, 
List<OutputSelector<?>>>();
                containingMaps.add(outputSelectors);
-               jobVertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
+               jobVertexClasses = new HashMap<Integer, Class<? extends 
AbstractInvokable>>();
                containingMaps.add(jobVertexClasses);
-               iterationIds = new HashMap<String, Integer>();
+               iterationIds = new HashMap<Integer, Integer>();
                containingMaps.add(jobVertexClasses);
-               iterationIDtoHeadName = new HashMap<Integer, String>();
-               iterationIDtoTailName = new HashMap<Integer, String>();
-               iterationTailCount = new HashMap<String, Integer>();
+               iterationIDtoHeadID = new HashMap<Integer, Integer>();
+               iterationIDtoTailID = new HashMap<Integer, Integer>();
+               iterationTailCount = new HashMap<Integer, Integer>();
                containingMaps.add(iterationTailCount);
-               iterationTimeouts = new HashMap<String, Long>();
+               iterationTimeouts = new HashMap<Integer, Long>();
                containingMaps.add(iterationTailCount);
-               operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
+               operatorStates = new HashMap<Integer, Map<String, 
OperatorState<?>>>();
                containingMaps.add(operatorStates);
-               inputFormatLists = new HashMap<String, InputFormat<String, 
?>>();
+               inputFormatLists = new HashMap<Integer, InputFormat<String, 
?>>();
                containingMaps.add(operatorStates);
-               sources = new HashSet<String>();
+               sources = new HashSet<Integer>();
        }
 
        /**
         * Adds a vertex to the streaming graph with the given parameters
         * 
-        * @param vertexName
-        *            Name of the vertex
+        * @param vertexID
+        *            ID of the vertex
         * @param invokableObject
         *            User defined operator
         * @param inTypeInfo
@@ -168,38 +168,38 @@ public class StreamGraph extends StreamingPlan {
         * @param parallelism
         *            Number of parallel instances created
         */
-       public <IN, OUT> void addStreamVertex(String vertexName,
+       public <IN, OUT> void addStreamVertex(Integer vertexID,
                        StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
                        TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
 
-               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
+               addVertex(vertexID, StreamVertex.class, invokableObject, 
operatorName, parallelism);
 
                StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
                                inTypeInfo, executionConfig) : null;
                StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null 
? new StreamRecordSerializer<OUT>(
                                outTypeInfo, executionConfig) : null;
 
-               addTypeSerializers(vertexName, inSerializer, null, 
outSerializer, null);
+               addTypeSerializers(vertexID, inSerializer, null, outSerializer, 
null);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Vertex: {}", vertexName);
+                       LOG.debug("Vertex: {}", vertexID);
                }
        }
 
-       public <IN, OUT> void addSourceVertex(String vertexName,
+       public <IN, OUT> void addSourceVertex(Integer vertexID,
                        StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
                        TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
-               addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
+               addStreamVertex(vertexID, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
                                parallelism);
-               sources.add(vertexName);
+               sources.add(vertexID);
        }
 
        /**
         * Adds a vertex for the iteration head to the {@link JobGraph}. The
         * iterated values will be fed from this vertex back to the graph.
         * 
-        * @param vertexName
-        *            Name of the vertex
+        * @param vertexID
+        *            ID of the vertex
         * @param iterationHead
         *            Id of the iteration head
         * @param iterationID
@@ -209,29 +209,29 @@ public class StreamGraph extends StreamingPlan {
         * @param waitTime
         *            Max wait time for next record
         */
-       public void addIterationHead(String vertexName, String iterationHead, 
Integer iterationID,
+       public void addIterationHead(Integer vertexID, Integer iterationHead, 
Integer iterationID,
                        int parallelism, long waitTime) {
 
-               addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
+               addVertex(vertexID, StreamIterationHead.class, null, null, 
parallelism);
 
                chaining = false;
 
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoHeadName.put(iterationID, vertexName);
+               iterationIds.put(vertexID, iterationID);
+               iterationIDtoHeadID.put(iterationID, vertexID);
 
-               setSerializersFrom(iterationHead, vertexName);
+               setSerializersFrom(iterationHead, vertexID);
 
-               setEdge(vertexName, iterationHead,
+               setEdge(vertexID, iterationHead,
                                
outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
                                new ArrayList<String>());
 
-               iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), 
waitTime);
+               iterationTimeouts.put(iterationIDtoHeadID.get(iterationID), 
waitTime);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SOURCE: {}", vertexName);
+                       LOG.debug("ITERATION SOURCE: {}", vertexID);
                }
 
-               sources.add(vertexName);
+               sources.add(vertexID);
        }
 
        /**
@@ -239,8 +239,8 @@ public class StreamGraph extends StreamingPlan {
         * intended to be iterated will be sent to this sink from the iteration
         * head.
         * 
-        * @param vertexName
-        *            Name of the vertex
+        * @param vertexID
+        *            ID of the vertex
         * @param iterationTail
         *            Id of the iteration tail
         * @param iterationID
@@ -250,51 +250,50 @@ public class StreamGraph extends StreamingPlan {
         * @param waitTime
         *            Max waiting time for next record
         */
-       public void addIterationTail(String vertexName, String iterationTail, 
Integer iterationID,
+       public void addIterationTail(Integer vertexID, Integer iterationTail, 
Integer iterationID,
                        long waitTime) {
 
                if (bufferTimeouts.get(iterationTail) == 0) {
                        throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
                }
 
-               addVertex(vertexName, StreamIterationTail.class, null, null, 
getParallelism(iterationTail));
+               addVertex(vertexID, StreamIterationTail.class, null, null, 
getParallelism(iterationTail));
 
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoTailName.put(iterationID, vertexName);
+               iterationIds.put(vertexID, iterationID);
+               iterationIDtoTailID.put(iterationID, vertexID);
 
-               setSerializersFrom(iterationTail, vertexName);
-               iterationTimeouts.put(iterationIDtoTailName.get(iterationID), 
waitTime);
+               setSerializersFrom(iterationTail, vertexID);
+               iterationTimeouts.put(iterationIDtoTailID.get(iterationID), 
waitTime);
 
-               setParallelism(iterationIDtoHeadName.get(iterationID), 
getParallelism(iterationTail));
-               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeouts.get(iterationTail));
+               setParallelism(iterationIDtoHeadID.get(iterationID), 
getParallelism(iterationTail));
+               setBufferTimeout(iterationIDtoHeadID.get(iterationID), 
bufferTimeouts.get(iterationTail));
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SINK: {}", vertexName);
+                       LOG.debug("ITERATION SINK: {}", vertexID);
                }
 
        }
 
-       public <IN1, IN2, OUT> void addCoTask(String vertexName,
+       public <IN1, IN2, OUT> void addCoTask(Integer vertexID,
                        CoInvokable<IN1, IN2, OUT> taskInvokableObject, 
TypeInformation<IN1> in1TypeInfo,
                        TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo,
                        String operatorName, int parallelism) {
 
-               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
+               addVertex(vertexID, CoStreamVertex.class, taskInvokableObject, 
operatorName, parallelism);
 
-               addTypeSerializers(vertexName,
-                               new StreamRecordSerializer<IN1>(in1TypeInfo, 
executionConfig),
+               addTypeSerializers(vertexID, new 
StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
                                new StreamRecordSerializer<IN2>(in2TypeInfo, 
executionConfig),
                                new StreamRecordSerializer<OUT>(outTypeInfo, 
executionConfig), null);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("CO-TASK: {}", vertexName);
+                       LOG.debug("CO-TASK: {}", vertexID);
                }
        }
 
        /**
         * Sets vertex parameters in the JobGraph
         * 
-        * @param vertexName
+        * @param vertexID
         *            Name of the vertex
         * @param vertexClass
         *            The class of the vertex
@@ -305,30 +304,30 @@ public class StreamGraph extends StreamingPlan {
         * @param parallelism
         *            Number of parallel instances created
         */
-       private void addVertex(String vertexName, Class<? extends 
AbstractInvokable> vertexClass,
+       private void addVertex(Integer vertexID, Class<? extends 
AbstractInvokable> vertexClass,
                        StreamInvokable<?, ?> invokableObject, String 
operatorName, int parallelism) {
 
-               jobVertexClasses.put(vertexName, vertexClass);
-               setParallelism(vertexName, parallelism);
-               invokableObjects.put(vertexName, invokableObject);
-               operatorNames.put(vertexName, operatorName);
-               outEdgeLists.put(vertexName, new ArrayList<String>());
-               outEdgeTypes.put(vertexName, new ArrayList<Integer>());
-               selectedNames.put(vertexName, new ArrayList<List<String>>());
-               outputSelectors.put(vertexName, new 
ArrayList<OutputSelector<?>>());
-               inEdgeLists.put(vertexName, new ArrayList<String>());
-               outputPartitioners.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
-               iterationTailCount.put(vertexName, 0);
+               jobVertexClasses.put(vertexID, vertexClass);
+               setParallelism(vertexID, parallelism);
+               invokableObjects.put(vertexID, invokableObject);
+               operatorNames.put(vertexID, operatorName);
+               outEdgeLists.put(vertexID, new ArrayList<Integer>());
+               outEdgeTypes.put(vertexID, new ArrayList<Integer>());
+               selectedNames.put(vertexID, new ArrayList<List<String>>());
+               outputSelectors.put(vertexID, new 
ArrayList<OutputSelector<?>>());
+               inEdgeLists.put(vertexID, new ArrayList<Integer>());
+               outputPartitioners.put(vertexID, new 
ArrayList<StreamPartitioner<?>>());
+               iterationTailCount.put(vertexID, 0);
        }
 
        /**
         * Connects two vertices in the JobGraph using the selected partitioner
         * settings
         * 
-        * @param upStreamVertexName
-        *            Name of the upstream(output) vertex
-        * @param downStreamVertexName
-        *            Name of the downstream(input) vertex
+        * @param upStreamVertexID
+        *            ID of the upstream(output) vertex
+        * @param downStreamVertexID
+        *            ID of the downstream(input) vertex
         * @param partitionerObject
         *            Partitioner object
         * @param typeNumber
@@ -336,16 +335,16 @@ public class StreamGraph extends StreamingPlan {
         * @param outputNames
         *            User defined names of the out edge
         */
-       public void setEdge(String upStreamVertexName, String 
downStreamVertexName,
+       public void setEdge(Integer upStreamVertexID, Integer 
downStreamVertexID,
                        StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames) {
-               outEdgeLists.get(upStreamVertexName).add(downStreamVertexName);
-               outEdgeTypes.get(upStreamVertexName).add(typeNumber);
-               inEdgeLists.get(downStreamVertexName).add(upStreamVertexName);
-               
outputPartitioners.get(upStreamVertexName).add(partitionerObject);
-               selectedNames.get(upStreamVertexName).add(outputNames);
+               outEdgeLists.get(upStreamVertexID).add(downStreamVertexID);
+               outEdgeTypes.get(upStreamVertexID).add(typeNumber);
+               inEdgeLists.get(downStreamVertexID).add(upStreamVertexID);
+               outputPartitioners.get(upStreamVertexID).add(partitionerObject);
+               selectedNames.get(upStreamVertexID).add(outputNames);
        }
 
-       public void removeEdge(String upStream, String downStream) {
+       public void removeEdge(Integer upStream, Integer downStream) {
                int inputIndex = getInEdges(downStream).indexOf(upStream);
                inEdgeLists.get(downStream).remove(inputIndex);
 
@@ -356,71 +355,71 @@ public class StreamGraph extends StreamingPlan {
                outputPartitioners.get(upStream).remove(outputIndex);
        }
 
-       public void removeVertex(String toRemove) {
-               List<String> outEdges = new 
ArrayList<String>(getOutEdges(toRemove));
-               List<String> inEdges = new 
ArrayList<String>(getInEdges(toRemove));
+       public void removeVertex(Integer toRemove) {
+               List<Integer> outEdges = new 
ArrayList<Integer>(getOutEdges(toRemove));
+               List<Integer> inEdges = new 
ArrayList<Integer>(getInEdges(toRemove));
 
-               for (String output : outEdges) {
+               for (Integer output : outEdges) {
                        removeEdge(toRemove, output);
                }
 
-               for (String input : inEdges) {
+               for (Integer input : inEdges) {
                        removeEdge(input, toRemove);
                }
 
-               for (Map<String, ?> map : containingMaps) {
+               for (Map<Integer, ?> map : containingMaps) {
                        map.remove(toRemove);
                }
 
        }
 
-       private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
+       private void addTypeSerializers(Integer vertexID, 
StreamRecordSerializer<?> in1,
                        StreamRecordSerializer<?> in2, 
StreamRecordSerializer<?> out1,
                        StreamRecordSerializer<?> out2) {
-               typeSerializersIn1.put(vertexName, in1);
-               typeSerializersIn2.put(vertexName, in2);
-               typeSerializersOut1.put(vertexName, out1);
-               typeSerializersOut2.put(vertexName, out2);
+               typeSerializersIn1.put(vertexID, in1);
+               typeSerializersIn2.put(vertexID, in2);
+               typeSerializersOut1.put(vertexID, out1);
+               typeSerializersOut2.put(vertexID, out2);
        }
 
        /**
         * Sets the number of parallel instances created for the given vertex.
         * 
-        * @param vertexName
-        *            Name of the vertex
+        * @param vertexID
+        *            ID of the vertex
         * @param parallelism
         *            Number of parallel instances created
         */
-       public void setParallelism(String vertexName, int parallelism) {
-               operatorParallelisms.put(vertexName, parallelism);
+       public void setParallelism(Integer vertexID, int parallelism) {
+               operatorParallelisms.put(vertexID, parallelism);
        }
 
-       public int getParallelism(String vertexName) {
-               return operatorParallelisms.get(vertexName);
+       public int getParallelism(Integer vertexID) {
+               return operatorParallelisms.get(vertexID);
        }
 
        /**
         * Sets the input format for the given vertex.
         * 
-        * @param vertexName
+        * @param vertexID
         *            Name of the vertex
         * @param inputFormat
         *            input format of the file source associated with the given
         *            vertex
         */
-       public void setInputFormat(String vertexName, InputFormat<String, ?> 
inputFormat) {
-               inputFormatLists.put(vertexName, inputFormat);
+       public void setInputFormat(Integer vertexID, InputFormat<String, ?> 
inputFormat) {
+               inputFormatLists.put(vertexID, inputFormat);
        }
 
-       public void setBufferTimeout(String vertexName, long bufferTimeout) {
-               this.bufferTimeouts.put(vertexName, bufferTimeout);
+       public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
+               this.bufferTimeouts.put(vertexID, bufferTimeout);
        }
 
-       public long getBufferTimeout(String vertexName) {
-               return this.bufferTimeouts.get(vertexName);
+       public long getBufferTimeout(Integer vertexID) {
+               return this.bufferTimeouts.get(vertexID);
        }
 
-       public void addOperatorState(String veretxName, String stateName, 
OperatorState<?> state) {
+       public void addOperatorState(Integer veretxName, String stateName, 
OperatorState<?> state) {
                Map<String, OperatorState<?>> states = 
operatorStates.get(veretxName);
                if (states == null) {
                        states = new HashMap<String, OperatorState<?>>();
@@ -440,52 +439,52 @@ public class StreamGraph extends StreamingPlan {
         * Sets a user defined {@link OutputSelector} for the given operator. 
Used
         * for directed emits.
         * 
-        * @param vertexName
+        * @param vertexID
         *            Name of the vertex for which the output selector will be 
set
         * @param outputSelector
         *            The user defined output selector.
         */
-       public <T> void setOutputSelector(String vertexName, OutputSelector<T> 
outputSelector) {
-               outputSelectors.get(vertexName).add(outputSelector);
+       public <T> void setOutputSelector(Integer vertexID, OutputSelector<T> 
outputSelector) {
+               outputSelectors.get(vertexID).add(outputSelector);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Outputselector set for {}", vertexName);
+                       LOG.debug("Outputselector set for {}", vertexID);
                }
 
        }
 
-       public <IN, OUT> void setInvokable(String vertexName, 
StreamInvokable<IN, OUT> invokableObject) {
-               invokableObjects.put(vertexName, invokableObject);
+       public <IN, OUT> void setInvokable(Integer vertexID, 
StreamInvokable<IN, OUT> invokableObject) {
+               invokableObjects.put(vertexID, invokableObject);
        }
 
-       public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
+       public <OUT> void setOutType(Integer id, TypeInformation<OUT> outType) {
                StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType,
                                executionConfig);
                typeSerializersOut1.put(id, serializer);
        }
 
-       public StreamInvokable<?, ?> getInvokable(String vertexName) {
-               return invokableObjects.get(vertexName);
+       public StreamInvokable<?, ?> getInvokable(Integer vertexID) {
+               return invokableObjects.get(vertexID);
        }
 
        @SuppressWarnings("unchecked")
-       public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String 
vertexName) {
-               return (StreamRecordSerializer<OUT>) 
typeSerializersOut1.get(vertexName);
+       public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(Integer 
vertexID) {
+               return (StreamRecordSerializer<OUT>) 
typeSerializersOut1.get(vertexID);
        }
 
        @SuppressWarnings("unchecked")
-       public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String 
vertexName) {
-               return (StreamRecordSerializer<OUT>) 
typeSerializersOut2.get(vertexName);
+       public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(Integer 
vertexID) {
+               return (StreamRecordSerializer<OUT>) 
typeSerializersOut2.get(vertexID);
        }
 
        @SuppressWarnings("unchecked")
-       public <IN> StreamRecordSerializer<IN> getInSerializer1(String 
vertexName) {
-               return (StreamRecordSerializer<IN>) 
typeSerializersIn1.get(vertexName);
+       public <IN> StreamRecordSerializer<IN> getInSerializer1(Integer 
vertexID) {
+               return (StreamRecordSerializer<IN>) 
typeSerializersIn1.get(vertexID);
        }
 
        @SuppressWarnings("unchecked")
-       public <IN> StreamRecordSerializer<IN> getInSerializer2(String 
vertexName) {
-               return (StreamRecordSerializer<IN>) 
typeSerializersIn2.get(vertexName);
+       public <IN> StreamRecordSerializer<IN> getInSerializer2(Integer 
vertexID) {
+               return (StreamRecordSerializer<IN>) 
typeSerializersIn2.get(vertexID);
        }
 
        /**
@@ -497,7 +496,7 @@ public class StreamGraph extends StreamingPlan {
         * @param to
         *            to
         */
-       public void setSerializersFrom(String from, String to) {
+       public void setSerializersFrom(Integer from, Integer to) {
                operatorNames.put(to, operatorNames.get(from));
 
                typeSerializersIn1.put(to, typeSerializersOut1.get(from));
@@ -539,33 +538,33 @@ public class StreamGraph extends StreamingPlan {
                this.chaining = chaining;
        }
 
-       public Set<Entry<String, StreamInvokable<?, ?>>> getInvokables() {
+       public Set<Entry<Integer, StreamInvokable<?, ?>>> getInvokables() {
                return invokableObjects.entrySet();
        }
 
-       public Collection<String> getSources() {
+       public Collection<Integer> getSources() {
                return sources;
        }
 
-       public List<String> getOutEdges(String vertexName) {
-               return outEdgeLists.get(vertexName);
+       public List<Integer> getOutEdges(Integer vertexID) {
+               return outEdgeLists.get(vertexID);
        }
 
-       public List<String> getInEdges(String vertexName) {
-               return inEdgeLists.get(vertexName);
+       public List<Integer> getInEdges(Integer vertexID) {
+               return inEdgeLists.get(vertexID);
        }
 
-       public List<Integer> getOutEdgeTypes(String vertexName) {
+       public List<Integer> getOutEdgeTypes(Integer vertexID) {
 
-               return outEdgeTypes.get(vertexName);
+               return outEdgeTypes.get(vertexID);
        }
 
-       public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, 
String downStreamVertex) {
+       public StreamPartitioner<?> getOutPartitioner(Integer upStreamVertex, 
Integer downStreamVertex) {
                return outputPartitioners.get(upStreamVertex).get(
                                
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
        }
 
-       public List<String> getSelectedNames(String upStreamVertex, String 
downStreamVertex) {
+       public List<String> getSelectedNames(Integer upStreamVertex, Integer 
downStreamVertex) {
 
                return selectedNames.get(upStreamVertex).get(
                                
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
@@ -575,40 +574,40 @@ public class StreamGraph extends StreamingPlan {
                return new HashSet<Integer>(iterationIds.values());
        }
 
-       public String getIterationTail(int iterID) {
-               return iterationIDtoTailName.get(iterID);
+       public Integer getIterationTail(int iterID) {
+               return iterationIDtoTailID.get(iterID);
        }
 
-       public String getIterationHead(int iterID) {
-               return iterationIDtoHeadName.get(iterID);
+       public Integer getIterationHead(int iterID) {
+               return iterationIDtoHeadID.get(iterID);
        }
 
-       public Class<? extends AbstractInvokable> getJobVertexClass(String 
vertexName) {
-               return jobVertexClasses.get(vertexName);
+       public Class<? extends AbstractInvokable> getJobVertexClass(Integer 
vertexID) {
+               return jobVertexClasses.get(vertexID);
        }
 
-       public InputFormat<String, ?> getInputFormat(String vertexName) {
-               return inputFormatLists.get(vertexName);
+       public InputFormat<String, ?> getInputFormat(Integer vertexID) {
+               return inputFormatLists.get(vertexID);
        }
 
-       public List<OutputSelector<?>> getOutputSelector(String vertexName) {
-               return outputSelectors.get(vertexName);
+       public List<OutputSelector<?>> getOutputSelector(Integer vertexID) {
+               return outputSelectors.get(vertexID);
        }
 
-       public Map<String, OperatorState<?>> getState(String vertexName) {
-               return operatorStates.get(vertexName);
+       public Map<String, OperatorState<?>> getState(Integer vertexID) {
+               return operatorStates.get(vertexID);
        }
 
-       public Integer getIterationID(String vertexName) {
-               return iterationIds.get(vertexName);
+       public Integer getIterationID(Integer vertexID) {
+               return iterationIds.get(vertexID);
        }
 
-       public long getIterationTimeout(String vertexName) {
-               return iterationTimeouts.get(vertexName);
+       public long getIterationTimeout(Integer vertexID) {
+               return iterationTimeouts.get(vertexID);
        }
 
-       public String getOperatorName(String vertexName) {
-               return operatorNames.get(vertexName);
+       public String getOperatorName(Integer vertexID) {
+               return operatorNames.get(vertexID);
        }
 
        @Override
@@ -621,18 +620,14 @@ public class StreamGraph extends StreamingPlan {
                        JSONArray nodes = new JSONArray();
 
                        json.put("nodes", nodes);
-                       List<Integer> operatorIDs = new ArrayList<Integer>();
-                       for (String id : operatorNames.keySet()) {
-                               operatorIDs.add(Integer.valueOf(id));
-                       }
+                       List<Integer> operatorIDs = new 
ArrayList<Integer>(operatorNames.keySet());
                        Collections.sort(operatorIDs);
 
-                       for (Integer idInt : operatorIDs) {
+                       for (Integer id : operatorIDs) {
                                JSONObject node = new JSONObject();
                                nodes.put(node);
 
-                               String id = idInt.toString();
-                               node.put("id", idInt);
+                               node.put("id", id);
                                node.put("type", getOperatorName(id));
 
                                if (sources.contains(id)) {
@@ -658,12 +653,12 @@ public class StreamGraph extends StreamingPlan {
 
                                        for (int i = 0; i < numIn; i++) {
 
-                                               String inID = 
getInEdges(id).get(i);
+                                               Integer inID = 
getInEdges(id).get(i);
 
                                                JSONObject input = new 
JSONObject();
                                                inputs.put(input);
 
-                                               input.put("id", 
Integer.valueOf(inID));
+                                               input.put("id", inID);
                                                input.put("ship_strategy", 
getOutPartitioner(inID, id).getStrategy());
                                                if (i == 0) {
                                                        input.put("side", 
"first");

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 9beea89..8ec8486 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -49,24 +49,24 @@ public class StreamingJobGraphGenerator {
 
        private StreamGraph streamGraph;
 
-       private Map<String, AbstractJobVertex> streamVertices;
+       private Map<Integer, AbstractJobVertex> streamVertices;
        private JobGraph jobGraph;
-       private Collection<String> builtNodes;
+       private Collection<Integer> builtVertices;
 
-       private Map<String, Map<String, StreamConfig>> chainedConfigs;
-       private Map<String, StreamConfig> vertexConfigs;
-       private Map<String, String> chainedNames;
+       private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+       private Map<Integer, StreamConfig> vertexConfigs;
+       private Map<Integer, String> chainedNames;
 
        public StreamingJobGraphGenerator(StreamGraph streamGraph) {
                this.streamGraph = streamGraph;
        }
 
        private void init() {
-               this.streamVertices = new HashMap<String, AbstractJobVertex>();
-               this.builtNodes = new HashSet<String>();
-               this.chainedConfigs = new HashMap<String, Map<String, 
StreamConfig>>();
-               this.vertexConfigs = new HashMap<String, StreamConfig>();
-               this.chainedNames = new HashMap<String, String>();
+               this.streamVertices = new HashMap<Integer, AbstractJobVertex>();
+               this.builtVertices = new HashSet<Integer>();
+               this.chainedConfigs = new HashMap<Integer, Map<Integer, 
StreamConfig>>();
+               this.vertexConfigs = new HashMap<Integer, StreamConfig>();
+               this.chainedNames = new HashMap<Integer, String>();
        }
 
        public JobGraph createJobGraph(String jobName) {
@@ -85,20 +85,20 @@ public class StreamingJobGraphGenerator {
        }
 
        private void setChaining() {
-               for (String sourceName : streamGraph.getSources()) {
+               for (Integer sourceName : streamGraph.getSources()) {
                        createChain(sourceName, sourceName);
                }
        }
 
-       private List<Tuple2<String, String>> createChain(String startNode, 
String current) {
+       private List<Tuple2<Integer, Integer>> createChain(Integer startNode, 
Integer current) {
 
-               if (!builtNodes.contains(startNode)) {
+               if (!builtVertices.contains(startNode)) {
 
-                       List<Tuple2<String, String>> transitiveOutEdges = new 
ArrayList<Tuple2<String, String>>();
-                       List<String> chainableOutputs = new ArrayList<String>();
-                       List<String> nonChainableOutputs = new 
ArrayList<String>();
+                       List<Tuple2<Integer, Integer>> transitiveOutEdges = new 
ArrayList<Tuple2<Integer, Integer>>();
+                       List<Integer> chainableOutputs = new 
ArrayList<Integer>();
+                       List<Integer> nonChainableOutputs = new 
ArrayList<Integer>();
 
-                       for (String outName : streamGraph.getOutEdges(current)) 
{
+                       for (Integer outName : 
streamGraph.getOutEdges(current)) {
                                if (isChainable(current, outName)) {
                                        chainableOutputs.add(outName);
                                } else {
@@ -106,12 +106,12 @@ public class StreamingJobGraphGenerator {
                                }
                        }
 
-                       for (String chainable : chainableOutputs) {
+                       for (Integer chainable : chainableOutputs) {
                                
transitiveOutEdges.addAll(createChain(startNode, chainable));
                        }
 
-                       for (String nonChainable : nonChainableOutputs) {
-                               transitiveOutEdges.add(new Tuple2<String, 
String>(current, nonChainable));
+                       for (Integer nonChainable : nonChainableOutputs) {
+                               transitiveOutEdges.add(new Tuple2<Integer, 
Integer>(current, nonChainable));
                                createChain(nonChainable, nonChainable);
                        }
 
@@ -127,7 +127,7 @@ public class StreamingJobGraphGenerator {
                                config.setChainStart();
                                config.setOutEdgesInOrder(transitiveOutEdges);
 
-                               for (Tuple2<String, String> edge : 
transitiveOutEdges) {
+                               for (Tuple2<Integer, Integer> edge : 
transitiveOutEdges) {
                                        connect(startNode, edge);
                                }
 
@@ -135,10 +135,10 @@ public class StreamingJobGraphGenerator {
 
                        } else {
 
-                               Map<String, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
+                               Map<Integer, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
 
                                if (chainedConfs == null) {
-                                       chainedConfigs.put(startNode, new 
HashMap<String, StreamConfig>());
+                                       chainedConfigs.put(startNode, new 
HashMap<Integer, StreamConfig>());
                                }
                                chainedConfigs.get(startNode).put(current, 
config);
                        }
@@ -146,113 +146,113 @@ public class StreamingJobGraphGenerator {
                        return transitiveOutEdges;
 
                } else {
-                       return new ArrayList<Tuple2<String, String>>();
+                       return new ArrayList<Tuple2<Integer, Integer>>();
                }
        }
 
-       private String createChainedName(String vertexID, List<String> 
chainedOutputs) {
-               String vertexName = streamGraph.getOperatorName(vertexID);
+       private String createChainedName(Integer vertexID, List<Integer> 
chainedOutputs) {
+               String operatorName = streamGraph.getOperatorName(vertexID);
                if (chainedOutputs.size() > 1) {
                        List<String> outputChainedNames = new 
ArrayList<String>();
-                       for (String chainable : chainedOutputs) {
+                       for (Integer chainable : chainedOutputs) {
                                
outputChainedNames.add(chainedNames.get(chainable));
                        }
-                       return vertexName + " -> (" + 
StringUtils.join(outputChainedNames, ", ") + ")";
+                       return operatorName + " -> (" + 
StringUtils.join(outputChainedNames, ", ") + ")";
                } else if (chainedOutputs.size() == 1) {
-                       return vertexName + " -> " + 
chainedNames.get(chainedOutputs.get(0));
+                       return operatorName + " -> " + 
chainedNames.get(chainedOutputs.get(0));
                } else {
-                       return vertexName;
+                       return operatorName;
                }
 
        }
 
-       private StreamConfig createProcessingVertex(String vertexName) {
+       private StreamConfig createProcessingVertex(Integer vertexID) {
 
-               AbstractJobVertex vertex = new 
AbstractJobVertex(chainedNames.get(vertexName));
+               AbstractJobVertex vertex = new 
AbstractJobVertex(chainedNames.get(vertexID));
 
-               
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName));
-               if (streamGraph.getParallelism(vertexName) > 0) {
-                       
vertex.setParallelism(streamGraph.getParallelism(vertexName));
+               
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexID));
+               if (streamGraph.getParallelism(vertexID) > 0) {
+                       
vertex.setParallelism(streamGraph.getParallelism(vertexID));
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Parallelism set: {} for {}", 
streamGraph.getParallelism(vertexName),
-                                       vertexName);
+                       LOG.debug("Parallelism set: {} for {}", 
streamGraph.getParallelism(vertexID),
+                                       vertexID);
                }
 
-               if (streamGraph.getInputFormat(vertexName) != null) {
-                       
vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName));
+               if (streamGraph.getInputFormat(vertexID) != null) {
+                       
vertex.setInputSplitSource(streamGraph.getInputFormat(vertexID));
                }
 
-               streamVertices.put(vertexName, vertex);
-               builtNodes.add(vertexName);
+               streamVertices.put(vertexID, vertex);
+               builtVertices.add(vertexID);
                jobGraph.addVertex(vertex);
 
                return new StreamConfig(vertex.getConfiguration());
        }
 
-       private void setVertexConfig(String vertexName, StreamConfig config,
-                       List<String> chainableOutputs, List<String> 
nonChainableOutputs) {
+       private void setVertexConfig(Integer vertexID, StreamConfig config,
+                       List<Integer> chainableOutputs, List<Integer> 
nonChainableOutputs) {
 
-               config.setVertexName(vertexName);
-               
config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName));
+               config.setVertexID(vertexID);
+               config.setBufferTimeout(streamGraph.getBufferTimeout(vertexID));
 
-               
config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName));
-               
config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName));
-               
config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName));
-               
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
+               
config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexID));
+               
config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexID));
+               
config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexID));
+               
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexID));
 
-               config.setUserInvokable(streamGraph.getInvokable(vertexName));
-               
config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
-               config.setOperatorStates(streamGraph.getState(vertexName));
+               config.setUserInvokable(streamGraph.getInvokable(vertexID));
+               
config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));
+               config.setOperatorStates(streamGraph.getState(vertexID));
 
                config.setNumberOfOutputs(nonChainableOutputs.size());
                config.setOutputs(nonChainableOutputs);
                config.setChainedOutputs(chainableOutputs);
 
-               Class<? extends AbstractInvokable> vertexClass = 
streamGraph.getJobVertexClass(vertexName);
+               Class<? extends AbstractInvokable> vertexClass = 
streamGraph.getJobVertexClass(vertexID);
 
                if (vertexClass.equals(StreamIterationHead.class)
                                || 
vertexClass.equals(StreamIterationTail.class)) {
-                       
config.setIterationId(streamGraph.getIterationID(vertexName));
-                       
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName));
+                       
config.setIterationId(streamGraph.getIterationID(vertexID));
+                       
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexID));
                }
 
-               List<String> allOutputs = new 
ArrayList<String>(chainableOutputs);
+               List<Integer> allOutputs = new 
ArrayList<Integer>(chainableOutputs);
                allOutputs.addAll(nonChainableOutputs);
 
-               for (String output : allOutputs) {
-                       config.setSelectedNames(output, 
streamGraph.getSelectedNames(vertexName, output));
+               for (Integer output : allOutputs) {
+                       config.setSelectedNames(output, 
streamGraph.getSelectedNames(vertexID, output));
                }
 
-               vertexConfigs.put(vertexName, config);
+               vertexConfigs.put(vertexID, config);
        }
 
-       private <T> void connect(String headOfChain, Tuple2<String, String> 
edge) {
+       private <T> void connect(Integer headOfChain, Tuple2<Integer, Integer> 
edge) {
 
-               String upStreamVertexName = edge.f0;
-               String downStreamVertexName = edge.f1;
+               Integer upStreamvertexID = edge.f0;
+               Integer downStreamvertexID = edge.f1;
 
-               int outputIndex = 
streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName);
+               int outputIndex = 
streamGraph.getOutEdges(upStreamvertexID).indexOf(downStreamvertexID);
 
                AbstractJobVertex headVertex = streamVertices.get(headOfChain);
-               AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamVertexName);
+               AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamvertexID);
 
                StreamConfig downStreamConfig = new 
StreamConfig(downStreamVertex.getConfiguration());
-               StreamConfig upStreamConfig = headOfChain == upStreamVertexName 
? new StreamConfig(
+               StreamConfig upStreamConfig = headOfChain == upStreamvertexID ? 
new StreamConfig(
                                headVertex.getConfiguration()) : 
chainedConfigs.get(headOfChain).get(
-                               upStreamVertexName);
+                               upStreamvertexID);
 
-               List<Integer> outEdgeIndexList = 
streamGraph.getOutEdgeTypes(upStreamVertexName);
+               List<Integer> outEdgeIndexList = 
streamGraph.getOutEdgeTypes(upStreamvertexID);
                int numOfInputs = downStreamConfig.getNumberOfInputs();
 
                downStreamConfig.setInputIndex(numOfInputs++, 
outEdgeIndexList.get(outputIndex));
                downStreamConfig.setNumberOfInputs(numOfInputs);
 
-               StreamPartitioner<?> partitioner = 
streamGraph.getOutPartitioner(upStreamVertexName,
-                               downStreamVertexName);
+               StreamPartitioner<?> partitioner = 
streamGraph.getOutPartitioner(upStreamvertexID,
+                               downStreamvertexID);
 
-               upStreamConfig.setPartitioner(downStreamVertexName, 
partitioner);
+               upStreamConfig.setPartitioner(downStreamvertexID, partitioner);
 
                if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
                        downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
@@ -262,13 +262,13 @@ public class StreamingJobGraphGenerator {
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("CONNECTED: {} - {} -> {}", 
partitioner.getClass().getSimpleName(),
-                                       headOfChain, downStreamVertexName);
+                                       headOfChain, downStreamvertexID);
                }
        }
 
-       private boolean isChainable(String vertexName, String outName) {
+       private boolean isChainable(Integer vertexID, Integer outName) {
 
-               StreamInvokable<?, ?> headInvokable = 
streamGraph.getInvokable(vertexName);
+               StreamInvokable<?, ?> headInvokable = 
streamGraph.getInvokable(vertexID);
                StreamInvokable<?, ?> outInvokable = 
streamGraph.getInvokable(outName);
 
                return streamGraph.getInEdges(outName).size() == 1
@@ -276,8 +276,8 @@ public class StreamingJobGraphGenerator {
                                && outInvokable.getChainingStrategy() == 
ChainingStrategy.ALWAYS
                                && (headInvokable.getChainingStrategy() == 
ChainingStrategy.HEAD || headInvokable
                                                .getChainingStrategy() == 
ChainingStrategy.ALWAYS)
-                               && streamGraph.getOutPartitioner(vertexName, 
outName).getStrategy() == PartitioningStrategy.FORWARD
-                               && streamGraph.getParallelism(vertexName) == 
streamGraph.getParallelism(outName)
+                               && streamGraph.getOutPartitioner(vertexID, 
outName).getStrategy() == PartitioningStrategy.FORWARD
+                               && streamGraph.getParallelism(vertexID) == 
streamGraph.getParallelism(outName)
                                && streamGraph.chaining;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java
index f0fa772..62fd5cf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java
@@ -30,13 +30,13 @@ import 
org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscret
 public class WindowingOptimzier {
 
        public static void optimizeGraph(StreamGraph streamGraph) {
-               Set<Entry<String, StreamInvokable<?, ?>>> invokables = 
streamGraph.getInvokables();
-               List<Tuple2<String, StreamDiscretizer<?>>> discretizers = new 
ArrayList<Tuple2<String, StreamDiscretizer<?>>>();
+               Set<Entry<Integer, StreamInvokable<?, ?>>> invokables = 
streamGraph.getInvokables();
+               List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new 
ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>();
 
                // Get the discretizers
-               for (Entry<String, StreamInvokable<?, ?>> entry : invokables) {
+               for (Entry<Integer, StreamInvokable<?, ?>> entry : invokables) {
                        if (entry.getValue() instanceof StreamDiscretizer) {
-                               discretizers.add(new Tuple2<String, 
StreamDiscretizer<?>>(entry.getKey(),
+                               discretizers.add(new Tuple2<Integer, 
StreamDiscretizer<?>>(entry.getKey(),
                                                (StreamDiscretizer<?>) 
entry.getValue()));
                        }
                }
@@ -46,15 +46,15 @@ public class WindowingOptimzier {
        }
 
        private static void setDiscretizerReuse(StreamGraph streamGraph,
-                       List<Tuple2<String, StreamDiscretizer<?>>> 
discretizers) {
-               List<Tuple2<StreamDiscretizer<?>, List<String>>> 
matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, 
List<String>>>();
+                       List<Tuple2<Integer, StreamDiscretizer<?>>> 
discretizers) {
+               List<Tuple2<StreamDiscretizer<?>, List<Integer>>> 
matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, 
List<Integer>>>();
 
-               for (Tuple2<String, StreamDiscretizer<?>> discretizer : 
discretizers) {
+               for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : 
discretizers) {
                        boolean inMatching = false;
-                       for (Tuple2<StreamDiscretizer<?>, List<String>> 
matching : matchingDiscretizers) {
-                               Set<String> discretizerInEdges = new 
HashSet<String>(
+                       for (Tuple2<StreamDiscretizer<?>, List<Integer>> 
matching : matchingDiscretizers) {
+                               Set<Integer> discretizerInEdges = new 
HashSet<Integer>(
                                                
streamGraph.getInEdges(discretizer.f0));
-                               Set<String> matchingInEdges = new 
HashSet<String>(
+                               Set<Integer> matchingInEdges = new 
HashSet<Integer>(
                                                
streamGraph.getInEdges(matching.f1.get(0)));
 
                                if (discretizer.f1.equals(matching.f0)
@@ -65,17 +65,17 @@ public class WindowingOptimzier {
                                }
                        }
                        if (!inMatching) {
-                               List<String> matchingNames = new 
ArrayList<String>();
+                               List<Integer> matchingNames = new 
ArrayList<Integer>();
                                matchingNames.add(discretizer.f0);
-                               matchingDiscretizers.add(new 
Tuple2<StreamDiscretizer<?>, List<String>>(
+                               matchingDiscretizers.add(new 
Tuple2<StreamDiscretizer<?>, List<Integer>>(
                                                discretizer.f1, matchingNames));
                        }
                }
 
-               for (Tuple2<StreamDiscretizer<?>, List<String>> matching : 
matchingDiscretizers) {
-                       List<String> matchList = matching.f1;
+               for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : 
matchingDiscretizers) {
+                       List<Integer> matchList = matching.f1;
                        if (matchList.size() > 1) {
-                               String first = matchList.get(0);
+                               Integer first = matchList.get(0);
                                for (int i = 1; i < matchList.size(); i++) {
                                        replaceDiscretizer(streamGraph, 
matchList.get(i), first);
                                }
@@ -83,26 +83,26 @@ public class WindowingOptimzier {
                }
        }
 
-       private static void replaceDiscretizer(StreamGraph streamGraph, String 
toReplace,
-                       String replaceWith) {
+       private static void replaceDiscretizer(StreamGraph streamGraph, Integer 
toReplace,
+                       Integer replaceWith) {
                // Convert to array to create a copy
-               List<String> outEdges = new 
ArrayList<String>(streamGraph.getOutEdges(toReplace));
+               List<Integer> outEdges = new 
ArrayList<Integer>(streamGraph.getOutEdges(toReplace));
 
                int numOutputs = outEdges.size();
 
                // Reconnect outputs
                for (int i = 0; i < numOutputs; i++) {
-                       String outName = outEdges.get(i);
+                       Integer output = outEdges.get(i);
 
-                       streamGraph.setEdge(replaceWith, outName,
-                                       
streamGraph.getOutPartitioner(toReplace, outName), 0, new ArrayList<String>());
-                       streamGraph.removeEdge(toReplace, outName);
+                       streamGraph.setEdge(replaceWith, output,
+                                       
streamGraph.getOutPartitioner(toReplace, output), 0, new ArrayList<String>());
+                       streamGraph.removeEdge(toReplace, output);
                }
 
-               List<String> inEdges = new 
ArrayList<String>(streamGraph.getInEdges(toReplace));
+               List<Integer> inEdges = new 
ArrayList<Integer>(streamGraph.getInEdges(toReplace));
                // Remove inputs
-               for (String inName : inEdges) {
-                       streamGraph.removeEdge(inName, toReplace);
+               for (Integer input : inEdges) {
+                       streamGraph.removeEdge(input, toReplace);
                }
 
                streamGraph.removeVertex(toReplace);

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f221e4b..46b71a5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -95,7 +95,7 @@ public class DataStream<OUT> {
 
        protected static Integer counter = 0;
        protected final StreamExecutionEnvironment environment;
-       protected final String id;
+       protected final Integer id;
        protected final String type;
        protected int degreeOfParallelism;
        protected List<String> userDefinedNames;
@@ -124,7 +124,7 @@ public class DataStream<OUT> {
                }
 
                counter++;
-               this.id = counter.toString();
+               this.id = counter;
                this.type = operatorType;
                this.environment = environment;
                this.degreeOfParallelism = environment.getDegreeOfParallelism();
@@ -166,7 +166,7 @@ public class DataStream<OUT> {
         * 
         * @return ID of the DataStream
         */
-       public String getId() {
+       public Integer getId() {
                return id;
        }
 
@@ -1167,7 +1167,7 @@ public class DataStream<OUT> {
         * @param typeNumber
         *            Number of the type (used at co-functions)
         */
-       protected <X> void connectGraph(DataStream<X> inputStream, String 
outputID, int typeNumber) {
+       protected <X> void connectGraph(DataStream<X> inputStream, Integer 
outputID, int typeNumber) {
                for (DataStream<X> stream : inputStream.mergedStreams) {
                        streamGraph.setEdge(stream.getId(), outputID, 
stream.partitioner, typeNumber,
                                        inputStream.userDefinedNames);
@@ -1260,9 +1260,9 @@ public class DataStream<OUT> {
                }
        }
 
-       private void validateMerge(String id) {
+       private void validateMerge(Integer id) {
                for (DataStream<OUT> ds : this.mergedStreams) {
-                       if (ds.getId().equals(id)) {
+                       if (ds.getId() == id) {
                                throw new RuntimeException("A DataStream cannot 
be merged with itself");
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 7df91f0..dc2e987 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -49,13 +49,16 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
 
        private SingleOutputStreamOperator<StreamWindow<OUT>, ?> 
discretizedStream;
        private WindowTransformation transformation;
+       protected boolean isPartitioned = false;
 
        protected 
DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> 
discretizedStream,
-                       KeySelector<OUT, ?> groupByKey, WindowTransformation 
tranformation) {
+                       KeySelector<OUT, ?> groupByKey, WindowTransformation 
tranformation,
+                       boolean isPartitioned) {
                super();
                this.groupByKey = groupByKey;
                this.discretizedStream = discretizedStream;
                this.transformation = tranformation;
+               this.isPartitioned = isPartitioned;
        }
 
        public DataStream<OUT> flatten() {
@@ -73,6 +76,8 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                                WindowTransformation.REDUCEWINDOW, "Window 
Reduce", getType(),
                                new WindowReducer<OUT>(reduceFunction)).merge();
 
+               // If we merged a non-grouped reduce transformation we need to 
reduce
+               // again
                if (!isGrouped() && out.discretizedStream.invokable instanceof 
WindowMerger) {
                        return out.transform(WindowTransformation.REDUCEWINDOW, 
"Window Reduce", out.getType(),
                                        new 
WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
@@ -93,14 +98,10 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
        public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction,
                        TypeInformation<R> returnType) {
                DiscretizedStream<R> out = partition(transformation).transform(
-                               WindowTransformation.REDUCEWINDOW, "Window 
Map", returnType,
-                               new WindowMapper<OUT, 
R>(discretizedStream.clean(windowMapFunction)));
+                               WindowTransformation.MAPWINDOW, "Window Map", 
returnType,
+                               new WindowMapper<OUT, 
R>(discretizedStream.clean(windowMapFunction))).merge();
 
-               if (isGrouped()) {
-                       return out.merge();
-               } else {
-                       return out;
-               }
+               return out;
        }
 
        private <R> DiscretizedStream<R> transform(WindowTransformation 
transformation,
@@ -120,29 +121,36 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
                                        new 
WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism);
 
                        out.groupByKey = null;
+                       out.isPartitioned = true;
 
                        return out;
                } else if (transformation != WindowTransformation.MAPWINDOW
                                && parallelism != 
discretizedStream.getExecutionEnvironment()
                                                .getDegreeOfParallelism()) {
-                       return transform(transformation, "Window partitioner", 
getType(),
+                       DiscretizedStream<OUT> out = transform(transformation, 
"Window partitioner", getType(),
                                        new 
WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
+
+                       out.isPartitioned = true;
+
+                       return out;
                } else {
+                       this.isPartitioned = false;
                        return this;
                }
        }
 
        private DiscretizedStream<OUT> setParallelism(int parallelism) {
-               return wrap(discretizedStream.setParallelism(parallelism));
+               return wrap(discretizedStream.setParallelism(parallelism), 
isPartitioned);
        }
 
        private DiscretizedStream<OUT> merge() {
                TypeInformation<StreamWindow<OUT>> type = 
discretizedStream.getType();
 
                // Only merge partitioned streams
-               if (discretizedStream.invokable instanceof WindowPartitioner) {
-                       return wrap(discretizedStream.groupBy(new 
WindowKey<OUT>()).transform("Window Merger",
-                                       type, new WindowMerger<OUT>()));
+               if (isPartitioned) {
+                       return wrap(
+                                       discretizedStream.groupBy(new 
WindowKey<OUT>()).transform("Window Merger",
+                                                       type, new 
WindowMerger<OUT>()), false);
                } else {
                        return this;
                }
@@ -150,14 +158,15 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
        }
 
        @SuppressWarnings("rawtypes")
-       private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator 
stream) {
+       private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator 
stream, boolean isPartitioned) {
                return wrap(stream, transformation);
        }
 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
                        WindowTransformation transformation) {
-               return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) 
this.groupByKey, transformation);
+               return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) 
this.groupByKey,
+                               transformation, isPartitioned);
        }
 
        @SuppressWarnings("rawtypes")
@@ -220,7 +229,8 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
        }
 
        protected DiscretizedStream<OUT> copy() {
-               return new DiscretizedStream<OUT>(discretizedStream.copy(), 
groupByKey, transformation);
+               return new DiscretizedStream<OUT>(discretizedStream.copy(), 
groupByKey, transformation,
+                               isPartitioned);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 36551d2..35b0324 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -268,10 +268,12 @@ public class WindowedDataStream<OUT> {
                WindowBuffer<OUT> windowBuffer = 
getWindowBuffer(transformation, getTrigger(),
                                getEviction(), discretizerKey);
 
+               DiscretizedStream<OUT> discretized = discretize(transformation, 
windowBuffer);
+
                if (windowBuffer instanceof CompletePreAggregator) {
-                       return discretize(transformation, windowBuffer);
+                       return discretized;
                } else {
-                       return discretize(transformation, 
windowBuffer).reduceWindow(reduceFunction);
+                       return discretized.reduceWindow(reduceFunction);
                }
        }
 
@@ -337,7 +339,8 @@ public class WindowedDataStream<OUT> {
                                .transform("Stream Discretizer", 
bufferEventType, discretizer)
                                .setParallelism(parallelism)
                                .transform("WindowBuffer", new 
StreamWindowTypeInfo<OUT>(getType()),
-                                               
bufferInvokable).setParallelism(parallelism), groupByKey, transformation);
+                                               
bufferInvokable).setParallelism(parallelism), groupByKey, transformation,
+                               false);
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 1a12cb2..911f060 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -51,9 +51,9 @@ public class OutputHandler<OUT> {
 
        public List<ChainableInvokable<?, ?>> chainedInvokables;
 
-       private Map<String, StreamOutput<?>> outputMap;
-       private Map<String, StreamConfig> chainedConfigs;
-       private List<Tuple2<String, String>> outEdgesInOrder;
+       private Map<Integer, StreamOutput<?>> outputMap;
+       private Map<Integer, StreamConfig> chainedConfigs;
+       private List<Tuple2<Integer, Integer>> outEdgesInOrder;
 
        public OutputHandler(StreamVertex<?, OUT> vertex) {
 
@@ -61,19 +61,19 @@ public class OutputHandler<OUT> {
                this.vertex = vertex;
                this.configuration = new 
StreamConfig(vertex.getTaskConfiguration());
                this.chainedInvokables = new ArrayList<ChainableInvokable<?, 
?>>();
-               this.outputMap = new HashMap<String, StreamOutput<?>>();
+               this.outputMap = new HashMap<Integer, StreamOutput<?>>();
                this.cl = vertex.getUserCodeClassLoader();
 
                // We read the chained configs, and the order of record writer
                // registrations by outputname
                this.chainedConfigs = 
configuration.getTransitiveChainedTaskConfigs(cl);
-               this.chainedConfigs.put(configuration.getTaskName(), 
configuration);
+               this.chainedConfigs.put(configuration.getVertexID(), 
configuration);
 
                this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
 
                // We iterate through all the out edges from this job vertex 
and create
                // a stream output
-               for (Tuple2<String, String> outEdge : outEdgesInOrder) {
+               for (Tuple2<Integer, Integer> outEdge : outEdgesInOrder) {
                        StreamOutput<?> streamOutput = 
createStreamOutput(outEdge.f1,
                                        chainedConfigs.get(outEdge.f0), 
outEdgesInOrder.indexOf(outEdge));
                        outputMap.put(outEdge.f1, streamOutput);
@@ -111,7 +111,7 @@ public class OutputHandler<OUT> {
                                chainedTaskConfig.getOutputSelectors(cl)) : new 
CollectorWrapper<OUT>();
 
                // Create collectors for the network outputs
-               for (String output : chainedTaskConfig.getOutputs(cl)) {
+               for (Integer output : chainedTaskConfig.getOutputs(cl)) {
 
                        Collector<?> outCollector = outputMap.get(output);
 
@@ -124,7 +124,7 @@ public class OutputHandler<OUT> {
                }
 
                // Create collectors for the chained outputs
-               for (String output : chainedTaskConfig.getChainedOutputs(cl)) {
+               for (Integer output : chainedTaskConfig.getChainedOutputs(cl)) {
                        Collector<?> outCollector = 
createChainedCollector(chainedConfigs.get(output));
                        if (isDirectEmit) {
                                ((DirectedCollectorWrapper<OUT>) 
wrapper).addCollector(outCollector,
@@ -157,7 +157,7 @@ public class OutputHandler<OUT> {
        }
 
        /**
-        * We create the StreamOutput for the specific output given by the 
name, and
+        * We create the StreamOutput for the specific output given by the id, 
and
         * the configuration of its source task
         * 
         * @param outputVertex
@@ -166,7 +166,7 @@ public class OutputHandler<OUT> {
         *            The config of upStream task
         * @return
         */
-       private <T> StreamOutput<T> createStreamOutput(String outputVertex, 
StreamConfig configuration,
+       private <T> StreamOutput<T> createStreamOutput(Integer outputVertex, 
StreamConfig configuration,
                        int outputIndex) {
 
                StreamRecordSerializer<T> outSerializer = configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 78cbbe5..38bba5e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -94,7 +94,7 @@ public class DirectedOutputTest {
 
        @Test
        public void outputSelectorTest() throws Exception {
-               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
128);
+               StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
32);
 
                SplitDataStream<Long> source = env.generateSequence(1, 
11).split(new MyOutputSelector());
                source.select(EVEN).addSink(new ListSink(EVEN));

http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index f111898..3163c46 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -122,6 +122,9 @@ public class WindowIntegrationTest implements Serializable {
                source.groupBy(key).window(Time.of(4, ts, 
1)).max(0).getDiscretizedStream()
                                .addSink(new DistributedSink3());
 
+               source.window(Count.of(5)).mapWindow(new 
IdentityWindowMap()).flatten()
+                               .addSink(new DistributedSink4());
+
                env.execute();
 
                // sum ( Count of 2 slide 3 )
@@ -180,6 +183,14 @@ public class WindowIntegrationTest implements Serializable 
{
                expected6.add(StreamWindow.fromElements(4));
                expected6.add(StreamWindow.fromElements(10));
 
+               validateOutput(expected6, DistributedSink3.windows);
+
+               List<StreamWindow<Integer>> expected7 = new 
ArrayList<StreamWindow<Integer>>();
+               expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
+               expected7.add(StreamWindow.fromElements(5, 10, 11, 11));
+
+               validateOutput(expected7, DistributedSink4.windows);
+
        }
 
        public static <R> void validateOutput(List<R> expected, List<R> actual) 
{
@@ -263,4 +274,17 @@ public class WindowIntegrationTest implements Serializable 
{
                }
 
        }
+
+       @SuppressWarnings("serial")
+       private static class DistributedSink4 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = Collections
+                               .synchronizedList(new 
ArrayList<StreamWindow<Integer>>());
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
 }

Reply via email to