Repository: flink
Updated Branches:
  refs/heads/master ea4f339d7 -> 3b69b2499


[FLINK-2335] [streaming] Lazy iteration construction in StreamGraph

Closes #900


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b69b249
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b69b249
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b69b249

Branch: refs/heads/master
Commit: 3b69b249991c23995dddc3b5182415f5c7df332a
Parents: ea4f339
Author: Gyula Fora <[email protected]>
Authored: Fri Jul 10 20:03:22 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sat Jul 11 14:23:59 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  48 +-
 .../api/datastream/IterativeDataStream.java     |  43 +-
 .../api/datastream/SplitDataStream.java         |   4 +-
 .../flink/streaming/api/graph/StreamConfig.java |   8 +-
 .../flink/streaming/api/graph/StreamEdge.java   |   6 +-
 .../flink/streaming/api/graph/StreamGraph.java  | 275 ++++++----
 .../flink/streaming/api/graph/StreamLoop.java   | 122 +++++
 .../api/graph/StreamingJobGraphGenerator.java   |  27 +-
 .../partitioner/RebalancePartitioner.java       |   5 +
 .../runtime/partitioner/StreamPartitioner.java  |   5 +
 .../runtime/tasks/StreamIterationHead.java      |   4 +-
 .../runtime/tasks/StreamIterationTail.java      |   4 +-
 .../apache/flink/streaming/api/IterateTest.java | 519 ++++++++++++++-----
 .../flink/streaming/api/scala/DataStream.scala  |   4 +
 14 files changed, 804 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c9c1f49..7896169 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -95,11 +95,11 @@ public class DataStream<OUT> {
        protected final StreamExecutionEnvironment environment;
        protected final Integer id;
        protected int parallelism;
-       protected List<String> userDefinedNames;
+       protected List<String> selectedNames;
        protected StreamPartitioner<OUT> partitioner;
        @SuppressWarnings("rawtypes")
        protected TypeInformation typeInfo;
-       protected List<DataStream<OUT>> unionizedStreams;
+       protected List<DataStream<OUT>> unionedStreams;
        
        protected Integer iterationID = null;
        protected Long iterationWaitTime = null;
@@ -126,11 +126,11 @@ public class DataStream<OUT> {
                this.environment = environment;
                this.parallelism = environment.getParallelism();
                this.streamGraph = environment.getStreamGraph();
-               this.userDefinedNames = new ArrayList<String>();
+               this.selectedNames = new ArrayList<String>();
                this.partitioner = new RebalancePartitioner<OUT>(true);
                this.typeInfo = typeInfo;
-               this.unionizedStreams = new ArrayList<DataStream<OUT>>();
-               this.unionizedStreams.add(this);
+               this.unionedStreams = new ArrayList<DataStream<OUT>>();
+               this.unionedStreams.add(this);
        }
 
        /**
@@ -143,17 +143,17 @@ public class DataStream<OUT> {
                this.environment = dataStream.environment;
                this.id = dataStream.id;
                this.parallelism = dataStream.parallelism;
-               this.userDefinedNames = new 
ArrayList<String>(dataStream.userDefinedNames);
+               this.selectedNames = new 
ArrayList<String>(dataStream.selectedNames);
                this.partitioner = dataStream.partitioner.copy();
                this.streamGraph = dataStream.streamGraph;
                this.typeInfo = dataStream.typeInfo;
                this.iterationID = dataStream.iterationID;
                this.iterationWaitTime = dataStream.iterationWaitTime;
-               this.unionizedStreams = new ArrayList<DataStream<OUT>>();
-               this.unionizedStreams.add(this);
-               if (dataStream.unionizedStreams.size() > 1) {
-                       for (int i = 1; i < dataStream.unionizedStreams.size(); 
i++) {
-                               this.unionizedStreams.add(new 
DataStream<OUT>(dataStream.unionizedStreams.get(i)));
+               this.unionedStreams = new ArrayList<DataStream<OUT>>();
+               this.unionedStreams.add(this);
+               if (dataStream.unionedStreams.size() > 1) {
+                       for (int i = 1; i < dataStream.unionedStreams.size(); 
i++) {
+                               this.unionedStreams.add(new 
DataStream<OUT>(dataStream.unionedStreams.get(i)));
                        }
                }
 
@@ -176,6 +176,14 @@ public class DataStream<OUT> {
        public int getParallelism() {
                return this.parallelism;
        }
+       
+       public StreamPartitioner<OUT> getPartitioner() {
+               return this.partitioner;
+       }
+       
+       public List<String> getSelectedNames(){
+               return selectedNames;
+       }
 
        /**
         * Gets the type of the stream.
@@ -248,9 +256,9 @@ public class DataStream<OUT> {
                DataStream<OUT> returnStream = this.copy();
 
                for (DataStream<OUT> stream : streams) {
-                       for (DataStream<OUT> ds : stream.unionizedStreams) {
+                       for (DataStream<OUT> ds : stream.unionedStreams) {
                                validateUnion(ds.getId());
-                               returnStream.unionizedStreams.add(ds.copy());
+                               returnStream.unionedStreams.add(ds.copy());
                        }
                }
                return returnStream;
@@ -268,7 +276,7 @@ public class DataStream<OUT> {
         * @return The {@link SplitDataStream}
         */
        public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-               for (DataStream<OUT> ds : this.unionizedStreams) {
+               for (DataStream<OUT> ds : this.unionedStreams) {
                        streamGraph.addOutputSelector(ds.getId(), 
clean(outputSelector));
                }
 
@@ -1103,9 +1111,7 @@ public class DataStream<OUT> {
        }
        
        protected <X> void addIterationSource(DataStream<X> dataStream, 
TypeInformation<?> feedbackType) {
-               Integer id = ++counter;
-               streamGraph.addIterationHead(id, dataStream.getId(), 
iterationID, iterationWaitTime, feedbackType);
-               streamGraph.setParallelism(id, dataStream.getParallelism());
+               streamGraph.addIterationHead(dataStream.getId(), iterationID, 
iterationWaitTime, feedbackType);
        }
 
        /**
@@ -1118,7 +1124,7 @@ public class DataStream<OUT> {
        protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> 
partitioner) {
                DataStream<OUT> returnStream = this.copy();
 
-               for (DataStream<OUT> stream : returnStream.unionizedStreams) {
+               for (DataStream<OUT> stream : returnStream.unionedStreams) {
                        stream.partitioner = partitioner;
                }
 
@@ -1139,9 +1145,9 @@ public class DataStream<OUT> {
         *            Number of the type (used at co-functions)
         */
        protected <X> void connectGraph(DataStream<X> inputStream, Integer 
outputID, int typeNumber) {
-               for (DataStream<X> stream : inputStream.unionizedStreams) {
+               for (DataStream<X> stream : inputStream.unionedStreams) {
                        streamGraph.addEdge(stream.getId(), outputID, 
stream.partitioner, typeNumber,
-                                       inputStream.userDefinedNames);
+                                       inputStream.selectedNames);
                }
 
        }
@@ -1170,7 +1176,7 @@ public class DataStream<OUT> {
        }
 
        private void validateUnion(Integer id) {
-               for (DataStream<OUT> ds : this.unionizedStreams) {
+               for (DataStream<OUT> ds : this.unionedStreams) {
                        if (ds.getId().equals(id)) {
                                throw new RuntimeException("A DataStream cannot 
be merged with itself");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index da3d885..4de368c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.List;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +34,8 @@ import 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
  */
 public class IterativeDataStream<IN> extends
                SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
+       
+       protected boolean closed = false;
 
        static Integer iterationCount = 0;
        
@@ -60,20 +64,18 @@ public class IterativeDataStream<IN> extends
         * @return The feedback stream.
         * 
         */
+       @SuppressWarnings({ "unchecked", "rawtypes" })
        public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean 
keepPartitioning) {
-               DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "Iteration Sink", null,
-                               null);
-
-               // We add an iteration sink to the tail which will send tuples 
to the
-               // iteration head
-               streamGraph.addIterationTail(iterationSink.getId(), 
iterationTail.getId(), iterationID,
-                               iterationWaitTime);
-
-               if (keepPartitioning) {
-                       connectGraph(iterationTail, iterationSink.getId(), 0);
-               } else {
-                       connectGraph(iterationTail.forward(), 
iterationSink.getId(), 0);
+               
+               if (closed) {
+                       throw new IllegalStateException(
+                                       "An iterative data stream can only be 
closed once. Use union to close with multiple stream.");
                }
+               closed = true;
+               
+               streamGraph.addIterationTail((List) 
iterationTail.unionedStreams, iterationID,
+                               keepPartitioning);
+
                return iterationTail;
        }
        
@@ -138,7 +140,8 @@ public class IterativeDataStream<IN> extends
         * @return A {@link ConnectedIterativeDataStream}.
         */
        public <F> ConnectedIterativeDataStream<IN, F> 
withFeedbackType(TypeInformation<F> feedbackType) {
-               return new ConnectedIterativeDataStream<IN, F>(this, 
feedbackType);
+               return new ConnectedIterativeDataStream<IN, F>(new 
IterativeDataStream<IN>(this,
+                               iterationWaitTime), feedbackType);
        }
        
        /**
@@ -201,14 +204,16 @@ public class IterativeDataStream<IN> extends
                 * @return The feedback stream.
                 * 
                 */
+               @SuppressWarnings({ "rawtypes", "unchecked" })
                public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-                       DataStream<F> iterationSink = new 
DataStreamSink<F>(input.environment, "Iteration Sink",
-                                       null, null);
+                       if (input.closed) {
+                               throw new IllegalStateException(
+                                               "An iterative data stream can 
only be closed once. Use union to close with multiple stream.");
+                       }
+                       input.closed = true;
                        
-                       
input.streamGraph.addIterationTail(iterationSink.getId(), 
feedbackStream.getId(), input.iterationID,
-                                       input.iterationWaitTime);
-
-                       input.connectGraph(feedbackStream, 
iterationSink.getId(), 0);
+                       input.streamGraph.addIterationTail((List) 
feedbackStream.unionedStreams,
+                                       input.iterationID, true);
                        return feedbackStream;
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 36a94c7..6b95fe7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -57,8 +57,8 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 
                DataStream<OUT> returnStream = copy();
 
-               for (DataStream<OUT> ds : returnStream.unionizedStreams) {
-                       ds.userDefinedNames = Arrays.asList(outputNames);
+               for (DataStream<OUT> ds : returnStream.unionedStreams) {
+                       ds.selectedNames = Arrays.asList(outputNames);
                }
                return returnStream;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0784582..6a44104 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -206,12 +206,12 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       public void setIterationId(Integer iterationId) {
-               config.setInteger(ITERATION_ID, iterationId);
+       public void setIterationId(String iterationId) {
+               config.setString(ITERATION_ID, iterationId);
        }
 
-       public Integer getIterationId() {
-               return config.getInteger(ITERATION_ID, 0);
+       public String getIterationId() {
+               return config.getString(ITERATION_ID, "");
        }
 
        public void setIterationWaitTime(long time) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 293f5e0..47d97df 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -46,7 +46,7 @@ public class StreamEdge implements Serializable {
         * output selection).
         */
        final private List<String> selectedNames;
-       final private StreamPartitioner<?> outputPartitioner;
+       private StreamPartitioner<?> outputPartitioner;
 
        public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
                        List<String> selectedNames, StreamPartitioner<?> 
outputPartitioner) {
@@ -87,6 +87,10 @@ public class StreamEdge implements Serializable {
        public StreamPartitioner<?> getPartitioner() {
                return outputPartitioner;
        }
+       
+       public void setPartitioner(StreamPartitioner<?> partitioner) {
+               this.outputPartitioner = partitioner;
+       }
 
        @Override
        public int hashCode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cae24be..64c349e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -79,6 +81,7 @@ public class StreamGraph extends StreamingPlan {
 
        private Map<Integer, StreamLoop> streamLoops;
        protected Map<Integer, StreamLoop> vertexIDtoLoop;
+       protected Map<Integer, String> vertexIDtoBrokerID;
        private StateHandleProvider<?> stateHandleProvider;
        private boolean forceCheckpoint = false;
 
@@ -97,7 +100,8 @@ public class StreamGraph extends StreamingPlan {
        public void clear() {
                streamNodes = new HashMap<Integer, StreamNode>();
                streamLoops = new HashMap<Integer, StreamLoop>();
-               vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
+               vertexIDtoLoop = new HashMap<Integer, StreamLoop>();
+               vertexIDtoBrokerID = new HashMap<Integer, String>();
                sources = new HashSet<Integer>();
        }
 
@@ -120,9 +124,9 @@ public class StreamGraph extends StreamingPlan {
        public void setCheckpointingInterval(long checkpointingInterval) {
                this.checkpointingInterval = checkpointingInterval;
        }
-       
+
        public void forceCheckpoint() {
-               this.forceCheckpoint = true;    
+               this.forceCheckpoint = true;
        }
 
        public void setStateHandleProvider(StateHandleProvider<?> provider) {
@@ -179,8 +183,9 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-                       TwoInputStreamOperator<IN1, IN2, OUT> 
taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
-                       TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo, String operatorName) {
+                       TwoInputStreamOperator<IN1, IN2, OUT> 
taskoperatorObject,
+                       TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> 
in2TypeInfo,
+                       TypeInformation<OUT> outTypeInfo, String operatorName) {
 
                addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, 
operatorName);
 
@@ -196,59 +201,192 @@ public class StreamGraph extends StreamingPlan {
                }
        }
 
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       public void addIterationHead(Integer sourceID, Integer iterationHead, 
Integer iterationID,
-                       long timeOut, TypeInformation<?> feedbackType) {
+       public void addIterationHead(Integer iterationHead, Integer 
iterationID, long timeOut,
+                       TypeInformation<?> feedbackType) {
+               // If there is no loop object created for this iteration create 
one
+               StreamLoop loop = streamLoops.get(iterationID);
+               if (loop == null) {
+                       loop = new StreamLoop(iterationID, timeOut, 
feedbackType);
+                       streamLoops.put(iterationID, loop);
+               }
 
-               StreamNode itSource = addNode(sourceID, 
StreamIterationHead.class, null, null);
+               loop.addHeadOperator(getStreamNode(iterationHead));
+       }
 
-               StreamLoop iteration = new StreamLoop(iterationID, 
getStreamNode(sourceID), timeOut);
-               streamLoops.put(iterationID, iteration);
-               vertexIDtoLoop.put(sourceID, iteration);
+       public void addIterationTail(List<DataStream<?>> feedbackStreams, 
Integer iterationID,
+                       boolean keepPartitioning) {
 
-               itSource.setOperatorName("IterationSource-" + sourceID);
-               
itSource.setParallelism(getStreamNode(iterationHead).getParallelism());
-               
-               if(feedbackType == null){
-                       setSerializersFrom(iterationHead, sourceID);
-                       addEdge(sourceID, iterationHead, new 
RebalancePartitioner(true), 0, new ArrayList<String>());
-               }else{
-                       itSource.setSerializerOut(new 
StreamRecordSerializer(feedbackType, executionConfig));
-                       addEdge(sourceID, iterationHead, new 
RebalancePartitioner(true), 2, new ArrayList<String>());
+               if (!streamLoops.containsKey(iterationID)) {
+                       throw new RuntimeException("Cannot close iteration 
without head operator.");
                }
-               
 
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SOURCE: {}", sourceID);
+               StreamLoop loop = streamLoops.get(iterationID);
+
+               for (DataStream<?> stream : feedbackStreams) {
+                       loop.addTailOperator(getStreamNode(stream.getId()), 
stream.getPartitioner(),
+                                       stream.getSelectedNames());
                }
 
-               sources.add(sourceID);
+               if (keepPartitioning) {
+                       loop.applyTailPartitioning();
+               }
        }
 
-       public void addIterationTail(Integer sinkID, Integer iterationTail, 
Integer iterationID,
-                       long waitTime) {
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       public void finalizeLoops() {
+               
+               // We create each loop separately, the order does not matter as 
sinks
+               // and sources don't interact
+               for (StreamLoop loop : streamLoops.values()) {
+
+                       // We make sure not to re-create the loops if the 
method is called
+                       // multiple times
+                       if (loop.getSourceSinkPairs().isEmpty()) {
+
+                               List<StreamNode> headOps = loop.getHeads();
+                               List<StreamNode> tailOps = loop.getTails();
+
+                               // This means that the iteration was not 
closed. It should not
+                               // be
+                               // allowed.
+                               if (tailOps.isEmpty()) {
+                                       throw new RuntimeException("Cannot 
execute job with empty iterations.");
+                               }
+
+                               // Check whether we keep the feedback 
partitioning
+                               if (loop.keepsPartitioning()) {
+                                       // This is the complicated case as we 
need to enforce
+                                       // partitioning on the tail -> sink 
side, which
+                                       // requires strict forward connections 
at source -> head
+
+                                       // We need one source/sink pair per 
different head
+                                       // parallelism
+                                       // as we depend on strict forwards 
connections
+                                       Map<Integer, List<StreamNode>> 
parallelismToHeads = new HashMap<Integer, List<StreamNode>>();
+
+                                       // Group head operators by parallelism
+                                       for (StreamNode head : headOps) {
+                                               int p = head.getParallelism();
+                                               if 
(!parallelismToHeads.containsKey(p)) {
+                                                       
parallelismToHeads.put(p, new ArrayList<StreamNode>());
+                                               }
+                                               
parallelismToHeads.get(p).add(head);
+                                       }
+
+                                       // We create the sink/source pair for 
each parallelism
+                                       // group,
+                                       // tails will forward to all sinks but 
each head operator
+                                       // will
+                                       // only receive from one source 
(corresponding to its
+                                       // parallelism)
+                                       int c = 0;
+                                       for (Entry<Integer, List<StreamNode>> 
headGroup : parallelismToHeads.entrySet()) {
+                                               List<StreamNode> headOpsInGroup 
= headGroup.getValue();
+
+                                               Tuple2<StreamNode, StreamNode> 
sourceSinkPair = createItSourceAndSink(loop,
+                                                               c);
+                                               StreamNode source = 
sourceSinkPair.f0;
+                                               StreamNode sink = 
sourceSinkPair.f1;
+
+                                               // We connect the source to the 
heads in this group
+                                               // (forward), setting
+                                               // type to 2 in case we have a 
coIteration (this sets
+                                               // the
+                                               // input as the second input of 
the co-operator)
+                                               for (StreamNode head : 
headOpsInGroup) {
+                                                       int inputType = 
loop.isCoIteration() ? 2 : 0;
+                                                       addEdge(source.getId(), 
head.getId(), new RebalancePartitioner(true),
+                                                                       
inputType, new ArrayList<String>());
+                                               }
+
+                                               // We connect all the tails to 
the sink keeping the
+                                               // partitioner
+                                               for (int i = 0; i < 
tailOps.size(); i++) {
+                                                       StreamNode tail = 
tailOps.get(i);
+                                                       StreamPartitioner<?> 
partitioner = loop.getTailPartitioners().get(i);
+                                                       addEdge(tail.getId(), 
sink.getId(), partitioner.copy(), 0, loop
+                                                                       
.getTailSelectedNames().get(i));
+                                               }
+
+                                               // We set the sink/source 
parallelism to the group
+                                               // parallelism
+                                               
source.setParallelism(headGroup.getKey());
+                                               
sink.setParallelism(source.getParallelism());
+
+                                               // We set the proper 
serializers for the sink/source
+                                               
setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+                                               if (loop.isCoIteration()) {
+                                                       
source.setSerializerOut(new StreamRecordSerializer(loop
+                                                                       
.getFeedbackType(), executionConfig));
+                                               } else {
+                                                       
setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
+                                               }
+
+                                               c++;
+                                       }
+
+                               } else {
+                                       // This is the most simple case, we add 
one iteration
+                                       // sink/source pair with the 
parallelism of the first tail
+                                       // operator. Tail operators will 
forward the records and
+                                       // partitioning will be enforced from 
source -> head
+
+                                       Tuple2<StreamNode, StreamNode> 
sourceSinkPair = createItSourceAndSink(loop, 0);
+                                       StreamNode source = sourceSinkPair.f0;
+                                       StreamNode sink = sourceSinkPair.f1;
+
+                                       // We get the feedback partitioner from 
the first input of
+                                       // the
+                                       // first head.
+                                       StreamPartitioner<?> partitioner = 
headOps.get(0).getInEdges().get(0)
+                                                       .getPartitioner();
+
+                                       // Connect the sources to heads using 
this partitioner
+                                       for (StreamNode head : headOps) {
+                                               addEdge(source.getId(), 
head.getId(), partitioner.copy(), 0,
+                                                               new 
ArrayList<String>());
+                                       }
+
+                                       // The tails are connected to the sink 
with forward
+                                       // partitioning
+                                       for (int i = 0; i < tailOps.size(); 
i++) {
+                                               StreamNode tail = 
tailOps.get(i);
+                                               addEdge(tail.getId(), 
sink.getId(), new RebalancePartitioner(true), 0, loop
+                                                               
.getTailSelectedNames().get(i));
+                                       }
+
+                                       // We set the parallelism to match the 
first tail op to make
+                                       // the
+                                       // forward more efficient
+                                       
sink.setParallelism(tailOps.get(0).getParallelism());
+                                       
source.setParallelism(sink.getParallelism());
+
+                                       // We set the proper serializers
+                                       
setSerializersFrom(headOps.get(0).getId(), source.getId());
+                                       
setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+                               }
 
-               if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
-                       throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
-               }
+                       }
 
-               StreamNode itSink = addNode(sinkID, StreamIterationTail.class, 
null, null);
+               }
 
-               StreamLoop iteration = streamLoops.get(iterationID);
-               iteration.setSink(getStreamNode(sinkID));
-               vertexIDtoLoop.put(sinkID, iteration);
-               
-               itSink.setParallelism(iteration.getSource().getParallelism());
+       }
 
-               setSerializersFrom(iterationTail, sinkID);
-               getStreamNode(sinkID).setOperatorName("IterationSink-" + 
sinkID);
+       private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop 
loop, int c) {
+               StreamNode source = addNode(-1 * streamNodes.size(), 
StreamIterationHead.class, null, null);
+               sources.add(source.getId());
 
-               setBufferTimeout(iteration.getSource().getId(), 
getStreamNode(iterationTail).getBufferTimeout());
+               StreamNode sink = addNode(-1 * streamNodes.size(), 
StreamIterationTail.class, null, null);
 
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SINK: {}", sinkID);
-               }
+               source.setOperatorName("IterationSource-" + loop.getID() + "_" 
+ c);
+               sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c);
+               vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c);
+               vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c);
+               vertexIDtoLoop.put(source.getId(), loop);
+               vertexIDtoLoop.put(sink.getId(), loop);
+               loop.addSourceSinkPair(source, sink);
 
+               return new Tuple2<StreamNode, StreamNode>(source, sink);
        }
 
        protected StreamNode addNode(Integer vertexID, Class<? extends 
AbstractInvokable> vertexClass,
@@ -284,7 +422,7 @@ public class StreamGraph extends StreamingPlan {
                getStreamNode(vertexID).setParallelism(parallelism);
        }
 
-       public void setKey(Integer vertexID, KeySelector<?,?> key) {
+       public void setKey(Integer vertexID, KeySelector<?, ?> key) {
                getStreamNode(vertexID).setStatePartitioner(key);
        }
 
@@ -382,6 +520,10 @@ public class StreamGraph extends StreamingPlan {
                return vertexIDtoLoop.get(vertexID).getID();
        }
 
+       public String getBrokerID(Integer vertexID) {
+               return vertexIDtoBrokerID.get(vertexID);
+       }
+
        public long getLoopTimeout(Integer vertexID) {
                return vertexIDtoLoop.get(vertexID).getTimeout();
        }
@@ -421,13 +563,13 @@ public class StreamGraph extends StreamingPlan {
         *            name of the jobGraph
         */
        public JobGraph getJobGraph(String jobGraphName) {
-
+               finalizeLoops();
                // temporarily forbid checkpointing for iterative jobs
                if (isIterative() && isCheckpointingEnabled() && 
!forceCheckpoint) {
                        throw new UnsupportedOperationException(
                                        "Checkpointing is currently not 
supported by default for iterative jobs, as we cannot guarantee exactly once 
semantics. "
-                                       + "State checkpoints happen normally, 
but records in-transit during the snapshot will be lost upon failure. "
-                                       + "\nThe user can force enable state 
checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
+                                                       + "State checkpoints 
happen normally, but records in-transit during the snapshot will be lost upon 
failure. "
+                                                       + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
                setJobName(jobGraphName);
@@ -474,45 +616,4 @@ public class StreamGraph extends StreamingPlan {
                DEFAULT, ISOLATE, NEWGROUP
        }
 
-       /**
-        * Object for representing loops in streaming programs.
-        * 
-        */
-       public static class StreamLoop {
-
-               private Integer loopID;
-
-               private StreamNode source;
-               private StreamNode sink;
-               
-               private Long timeout;
-
-               public StreamLoop(Integer loopID, StreamNode source, Long 
timeout) {
-                       this.loopID = loopID;
-                       this.source = source;
-                       this.timeout = timeout;
-               }
-
-               public Integer getID() {
-                       return loopID;
-               }
-
-               public Long getTimeout() {
-                       return timeout;
-               }
-
-               public void setSink(StreamNode sink) {
-                       this.sink = sink;
-               }
-
-               public StreamNode getSource() {
-                       return source;
-               }
-
-               public StreamNode getSink() {
-                       return sink;
-               }
-
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
new file mode 100644
index 0000000..ba987ef
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * Object for representing loops in streaming programs.
+ * 
+ */
+public class StreamLoop {
+
+       private Integer loopID;
+
+       private List<StreamNode> headOperators = new ArrayList<StreamNode>();
+       private List<StreamNode> tailOperators = new ArrayList<StreamNode>();
+       private List<StreamPartitioner<?>> tailPartitioners = new 
ArrayList<StreamPartitioner<?>>();
+       private List<List<String>> tailSelectedNames = new 
ArrayList<List<String>>();
+
+       private boolean coIteration = false;
+       private TypeInformation<?> feedbackType = null;
+
+       private long timeout;
+       private boolean tailPartitioning = false;
+
+       private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new 
ArrayList<Tuple2<StreamNode, StreamNode>>();
+
+       public StreamLoop(Integer loopID, long timeout, TypeInformation<?> 
feedbackType) {
+               this.loopID = loopID;
+               this.timeout = timeout;
+               if (feedbackType != null) {
+                       this.feedbackType = feedbackType;
+                       coIteration = true;
+                       tailPartitioning = true;
+               }
+       }
+
+       public Integer getID() {
+               return loopID;
+       }
+
+       public long getTimeout() {
+               return timeout;
+       }
+
+       public boolean isCoIteration() {
+               return coIteration;
+       }
+
+       public TypeInformation<?> getFeedbackType() {
+               return feedbackType;
+       }
+
+       public void addSourceSinkPair(StreamNode source, StreamNode sink) {
+               this.sourcesAndSinks.add(new Tuple2<StreamNode, 
StreamNode>(source, sink));
+       }
+
+       public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() {
+               return this.sourcesAndSinks;
+       }
+
+       public void addHeadOperator(StreamNode head) {
+               this.headOperators.add(head);
+       }
+
+       public void addTailOperator(StreamNode tail, StreamPartitioner<?> 
partitioner,
+                       List<String> selectedNames) {
+               this.tailOperators.add(tail);
+               this.tailPartitioners.add(partitioner);
+               this.tailSelectedNames.add(selectedNames);
+       }
+
+       public void applyTailPartitioning() {
+               this.tailPartitioning = true;
+       }
+
+       public boolean keepsPartitioning() {
+               return tailPartitioning;
+       }
+
+       public List<StreamNode> getHeads() {
+               return headOperators;
+       }
+
+       public List<StreamNode> getTails() {
+               return tailOperators;
+       }
+
+       public List<StreamPartitioner<?>> getTailPartitioners() {
+               return tailPartitioners;
+       }
+
+       public List<List<String>> getTailSelectedNames() {
+               return tailSelectedNames;
+       }
+
+       @Override
+       public String toString() {
+               return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" 
+ "Tail: " + tailOperators
+                               + "\n" + "TP: " + tailPartitioners + "\n" + 
"TSN: " + tailSelectedNames;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index eb34e3f..4d541bc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -30,17 +30,17 @@ import java.util.Map.Entry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import 
org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -250,6 +250,7 @@ public class StreamingJobGraphGenerator {
                return retConfig;
        }
 
+       @SuppressWarnings("unchecked")
        private void setVertexConfig(Integer vertexID, StreamConfig config,
                        List<StreamEdge> chainableOutputs, List<StreamEdge> 
nonChainableOutputs) {
 
@@ -276,7 +277,7 @@ public class StreamingJobGraphGenerator {
 
                if (vertexClass.equals(StreamIterationHead.class)
                                || 
vertexClass.equals(StreamIterationTail.class)) {
-                       config.setIterationId(streamGraph.getLoopID(vertexID));
+                       
config.setIterationId(streamGraph.getBrokerID(vertexID));
                        
config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
                }
 
@@ -360,13 +361,19 @@ public class StreamingJobGraphGenerator {
                }
 
                for (StreamLoop loop : streamGraph.getStreamLoops()) {
-                       CoLocationGroup ccg = new CoLocationGroup();
-                       JobVertex tail = 
jobVertices.get(loop.getSink().getId());
-                       JobVertex head = 
jobVertices.get(loop.getSource().getId());
-                       ccg.addVertex(head);
-                       ccg.addVertex(tail);
-                       tail.updateCoLocationGroup(ccg);
-                       head.updateCoLocationGroup(ccg);
+                       for (Tuple2<StreamNode, StreamNode> pair : 
loop.getSourceSinkPairs()) {
+                               
+                               CoLocationGroup ccg = new CoLocationGroup();
+                               
+                               JobVertex source = 
jobVertices.get(pair.f0.getId());
+                               JobVertex sink = 
jobVertices.get(pair.f1.getId());
+                               
+                               ccg.addVertex(source);
+                               ccg.addVertex(sink);
+                               source.updateCoLocationGroup(ccg);
+                               sink.updateCoLocationGroup(ccg);
+                       }
+
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index 70d9c6b..e6ad821 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -49,4 +49,9 @@ public class RebalancePartitioner<T> extends 
StreamPartitioner<T> {
        public StreamPartitioner<T> copy() {
                return new RebalancePartitioner<T>(forward);
        }
+       
+       @Override
+       public String toString() {
+               return forward ? "ForwardPartitioner" : "RebalancePartitioner";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index ef598c6..b37655b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -45,4 +45,9 @@ public abstract class StreamPartitioner<T> implements
        public StreamPartitioner<T> copy() {
                return this;
        }
+       
+       @Override
+       public String toString() {
+               return this.getClass().getSimpleName();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 4952cdf..25fe83d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -48,12 +48,12 @@ public class StreamIterationHead<OUT> extends 
OneInputStreamTask<OUT, OUT> {
                super.registerInputOutput();
                outputHandler = new OutputHandler<OUT>(this);
 
-               Integer iterationId = configuration.getIterationId();
+               String iterationId = configuration.getIterationId();
                iterationWaitTime = configuration.getIterationWaitTime();
                shouldWait = iterationWaitTime > 0;
 
                try {
-                       
BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" 
+                       BlockingQueueBroker.instance().handIn(iterationId+"-" 
                                        
+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
                } catch (Exception e) {
                        throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 5bbae06..b6e3889 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -30,7 +30,7 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
 
-       private Integer iterationId;
+       private String iterationId;
 
        @SuppressWarnings("rawtypes")
        private BlockingQueue<StreamRecord> dataChannel;
@@ -47,7 +47,7 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                        iterationId = configuration.getIterationId();
                        iterationWaitTime = 
configuration.getIterationWaitTime();
                        shouldWait = iterationWaitTime > 0;
-                       dataChannel = 
BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+                       dataChannel = 
BlockingQueueBroker.instance().get(iterationId+"-"
                                        
+getEnvironment().getIndexInSubtaskGroup());
                } catch (Exception e) {
                        throw new StreamTaskException(String.format(

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 3021abb..2a88a32 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -18,175 +18,318 @@
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import 
org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class IterateTest {
+@SuppressWarnings({ "unchecked", "unused", "serial" })
+public class IterateTest extends StreamingMultipleProgramsTestBase {
 
        private static final long MEMORYSIZE = 32;
        private static boolean iterated[];
        private static int PARALLELISM = 2;
 
-       public static final class IterationHead extends 
RichFlatMapFunction<Boolean, Boolean> {
-
-               private static final long serialVersionUID = 1L;
+       @Test
+       public void testException() throws Exception {
 
-               @Override
-               public void flatMap(Boolean value, Collector<Boolean> out) 
throws Exception {
-                       int indx = getRuntimeContext().getIndexOfThisSubtask();
-                       if (value) {
-                               iterated[indx] = true;
-                       } else {
-                               out.collect(value);
-                       }
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+               DataStream<Integer> source = env.fromElements(1, 10);
+               IterativeDataStream<Integer> iter1 = source.iterate();
+               IterativeDataStream<Integer> iter2 = source.iterate();
 
+               iter1.closeWith(iter1.map(NoOpIntMap));
+               // Check for double closing
+               try {
+                       iter1.closeWith(iter1.map(NoOpIntMap));
+                       fail();
+               } catch (Exception e) {
                }
 
-       }
+               // Check for closing iteration without head
+               try {
+                       iter2.closeWith(iter1.map(NoOpIntMap));
+                       fail();
+               } catch (Exception e) {
+               }
 
-       public static final class IterationTail extends 
RichFlatMapFunction<Boolean, Boolean> {
+               iter2.map(NoOpIntMap);
 
-               private static final long serialVersionUID = 1L;
+               // Check for executing with empty iteration
+               try {
+                       env.execute();
+                       fail();
+               } catch (Exception e) {
+               }
+       }
 
-               @Override
-               public void flatMap(Boolean value, Collector<Boolean> out) 
throws Exception {
-                       out.collect(true);
+       @Test
+       public void testImmutabilityWithCoiteration() {
+               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+               DataStream<Integer> source = env.fromElements(1, 10);
 
-               }
+               IterativeDataStream<Integer> iter1 = source.iterate();
+               // Calling withFeedbackType should create a new iteration
+               ConnectedIterativeDataStream<Integer, String> iter2 = 
iter1.withFeedbackType(String.class);
 
-       }
+               iter1.closeWith(iter1.map(NoOpIntMap));
+               iter2.closeWith(iter2.map(NoOpCoMap));
 
-       public static final class MySink implements SinkFunction<Boolean> {
+               StreamGraph graph = env.getStreamGraph();
 
-               private static final long serialVersionUID = 1L;
+               graph.getJobGraph();
 
-               @Override
-               public void invoke(Boolean tuple) {
+               assertEquals(2, graph.getStreamLoops().size());
+               for (StreamLoop loop : graph.getStreamLoops()) {
+                       assertEquals(loop.getHeads(), loop.getTails());
+                       List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = 
loop.getSourceSinkPairs();
+                       assertEquals(1, sourceSinkPairs.size());
                }
        }
 
-       public static final class NoOpMap implements MapFunction<Boolean, 
Boolean> {
+       @Test
+       public void testmultipleHeadsTailsSimple() {
+               StreamExecutionEnvironment env = new TestStreamEnvironment(4, 
MEMORYSIZE);
+               DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 
5).shuffle();
+               DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
 
-               private static final long serialVersionUID = 1L;
+               IterativeDataStream<Integer> iter1 = 
source1.union(source2).iterate();
 
-               @Override
-               public Boolean map(Boolean value) throws Exception {
-                       return value;
-               }
+               DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+               DataStream<Integer> head2 = 
iter1.map(NoOpIntMap).setParallelism(2);
+               DataStream<Integer> head3 = 
iter1.map(NoOpIntMap).setParallelism(2)
+                               .addSink(new NoOpSink<Integer>());
+               DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new 
NoOpSink<Integer>());
 
-       }
+               SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 
5).split(
+                               new OutputSelector<Integer>() {
 
-       public StreamExecutionEnvironment 
constructIterativeJob(StreamExecutionEnvironment env) {
-               env.setBufferTimeout(10);
+                                       @Override
+                                       public Iterable<String> select(Integer 
value) {
+                                               return value % 2 == 0 ? 
Arrays.asList("even") : Arrays.asList("odd");
+                                       }
+                               });
 
-               DataStream<Boolean> source = 
env.fromCollection(Collections.nCopies(PARALLELISM, false));
+               iter1.closeWith(source3.select("even").union(
+                               
head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle()));
 
-               IterativeDataStream<Boolean> iteration = source.iterate(3000);
+               StreamGraph graph = env.getStreamGraph();
 
-               DataStream<Boolean> increment = iteration.flatMap(new 
IterationHead()).flatMap(
-                               new IterationTail());
+               JobGraph jg = graph.getJobGraph();
 
-               iteration.closeWith(increment).addSink(new MySink());
-               return env;
-       }
+               assertEquals(1, graph.getStreamLoops().size());
+               StreamLoop loop = new 
ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
 
-       @Test
-       public void testColocation() throws Exception {
-               StreamExecutionEnvironment env = new TestStreamEnvironment(4, 
MEMORYSIZE);
+               assertEquals(4, loop.getHeads().size());
+               assertEquals(3, loop.getTails().size());
 
-               IterativeDataStream<Boolean> it = 
env.fromElements(true).rebalance().map(new NoOpMap())
-                               .iterate();
+               assertEquals(1, loop.getSourceSinkPairs().size());
+               Tuple2<StreamNode, StreamNode> pair = 
loop.getSourceSinkPairs().get(0);
 
-               DataStream<Boolean> head = it.map(new 
NoOpMap()).setParallelism(2).name("HeadOperator");
+               assertEquals(pair.f0.getParallelism(), 
pair.f1.getParallelism());
+               assertEquals(4, pair.f0.getOutEdges().size());
+               assertEquals(3, pair.f1.getInEdges().size());
 
-               it.closeWith(head.map(new 
NoOpMap()).setParallelism(3).name("TailOperator")).print();
+               for (StreamEdge edge : pair.f0.getOutEdges()) {
+                       assertTrue(edge.getPartitioner() instanceof 
ShufflePartitioner);
+               }
+               for (StreamEdge edge : pair.f1.getInEdges()) {
+                       assertTrue(edge.getPartitioner() instanceof 
RebalancePartitioner);
+               }
 
-               JobGraph graph = env.getStreamGraph().getJobGraph();
+               
assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
 
-               JobVertex itSource = null;
-               JobVertex itSink = null;
-               JobVertex headOp = null;
-               JobVertex tailOp = null;
+               // Test co-location
 
-               for (JobVertex vertex : graph.getVertices()) {
+               JobVertex itSource1 = null;
+               JobVertex itSink1 = null;
+
+               for (JobVertex vertex : jg.getVertices()) {
                        if (vertex.getName().contains("IterationSource")) {
-                               itSource = vertex;
+                               itSource1 = vertex;
                        } else if (vertex.getName().contains("IterationSink")) {
-                               itSink = vertex;
-                       } else if (vertex.getName().contains("HeadOperator")) {
-                               headOp = vertex;
-                       } else if (vertex.getName().contains("TailOp")) {
-                               tailOp = vertex;
+
+                               itSink1 = vertex;
+
                        }
                }
 
-               assertTrue(itSource.getCoLocationGroup() != null);
-               assertEquals(itSource.getCoLocationGroup(), 
itSink.getCoLocationGroup());
-               assertEquals(headOp.getParallelism(), 2);
-               assertEquals(tailOp.getParallelism(), 3);
-               assertEquals(itSource.getParallelism(), 
itSink.getParallelism());
+               assertTrue(itSource1.getCoLocationGroup() != null);
+               assertEquals(itSource1.getCoLocationGroup(), 
itSink1.getCoLocationGroup());
        }
 
-       @SuppressWarnings("unchecked")
        @Test
-       public void testPartitioning() throws Exception {
+       public void testmultipleHeadsTailsWithTailPartitioning() {
                StreamExecutionEnvironment env = new TestStreamEnvironment(4, 
MEMORYSIZE);
+               DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 
5).shuffle();
+               DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
 
-               IterativeDataStream<Boolean> it = 
env.fromElements(true).iterate();
+               IterativeDataStream<Integer> iter1 = 
source1.union(source2).iterate();
 
-               IterativeDataStream<Boolean> it2 = 
env.fromElements(true).iterate();
+               DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+               DataStream<Integer> head2 = 
iter1.map(NoOpIntMap).setParallelism(2).name("shuffle");
+               DataStream<Integer> head3 = 
iter1.map(NoOpIntMap).setParallelism(2)
+                               .addSink(new NoOpSink<Integer>());
+               DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new 
NoOpSink<Integer>());
 
-               DataStream<Boolean> head = it.map(new 
NoOpMap()).name("Head1").broadcast();
-               DataStream<Boolean> head2 = it2.map(new 
NoOpMap()).name("Head2").broadcast();
+               SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 
5).name("split")
+                               .split(new OutputSelector<Integer>() {
 
-               it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), 
true);
-               it2.closeWith(head2, false);
+                                       @Override
+                                       public Iterable<String> select(Integer 
value) {
+                                               return value % 2 == 0 ? 
Arrays.asList("even") : Arrays.asList("odd");
+                                       }
+                               });
+
+               iter1.closeWith(
+                               source3.select("even").union(
+                                               
head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"),
+                                               head2.shuffle()), true);
 
                StreamGraph graph = env.getStreamGraph();
 
-               for (StreamLoop loop : graph.getStreamLoops()) {
-                       StreamEdge tailToSink = 
loop.getSink().getInEdges().get(0);
-                       if 
(tailToSink.getSourceVertex().getOperatorName().contains("Head1")) {
-                               assertTrue(tailToSink.getPartitioner() 
instanceof BroadcastPartitioner);
-                               
assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof 
ShufflePartitioner);
-                       } else {
-                               assertTrue(tailToSink.getPartitioner() 
instanceof RebalancePartitioner);
+               JobGraph jg = graph.getJobGraph();
+
+               assertEquals(1, graph.getStreamLoops().size());
+
+               StreamLoop loop = new 
ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+
+               assertEquals(4, loop.getHeads().size());
+               assertEquals(3, loop.getTails().size());
+
+               assertEquals(2, loop.getSourceSinkPairs().size());
+               List<Tuple2<StreamNode, StreamNode>> pairs = 
loop.getSourceSinkPairs();
+               Tuple2<StreamNode, StreamNode> pair1 = 
pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0)
+                               : pairs.get(1);
+               Tuple2<StreamNode, StreamNode> pair2 = 
pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0)
+                               : pairs.get(1);
+
+               assertEquals(pair1.f0.getParallelism(), 
pair1.f1.getParallelism());
+               assertEquals(2, pair1.f0.getParallelism());
+               assertEquals(2, pair1.f0.getOutEdges().size());
+               assertEquals(3, pair1.f1.getInEdges().size());
+
+               for (StreamEdge edge : pair1.f0.getOutEdges()) {
+                       assertTrue(edge.getPartitioner() instanceof 
RebalancePartitioner);
+                       assertEquals(2, 
edge.getTargetVertex().getParallelism());
+               }
+               for (StreamEdge edge : pair1.f1.getInEdges()) {
+                       String tailName = 
edge.getSourceVertex().getOperatorName();
+                       if (tailName.equals("split")) {
+                               assertTrue(edge.getPartitioner() instanceof 
RebalancePartitioner);
+                       } else if (tailName.equals("bc")) {
+                               assertTrue(edge.getPartitioner() instanceof 
BroadcastPartitioner);
+                       } else if (tailName.equals("shuffle")) {
+                               assertTrue(edge.getPartitioner() instanceof 
ShufflePartitioner);
+                       }
+
+               }
+
+               assertEquals(pair2.f0.getParallelism(), 
pair2.f1.getParallelism());
+               assertEquals(4, pair2.f0.getParallelism());
+               assertEquals(2, pair2.f0.getOutEdges().size());
+               assertEquals(3, pair2.f1.getInEdges().size());
+
+               for (StreamEdge edge : pair2.f0.getOutEdges()) {
+                       assertTrue(edge.getPartitioner() instanceof 
RebalancePartitioner);
+                       assertEquals(4, 
edge.getTargetVertex().getParallelism());
+               }
+               for (StreamEdge edge : pair2.f1.getInEdges()) {
+                       String tailName = 
edge.getSourceVertex().getOperatorName();
+                       if (tailName.equals("split")) {
+                               assertTrue(edge.getPartitioner() instanceof 
RebalancePartitioner);
+                       } else if (tailName.equals("bc")) {
+                               assertTrue(edge.getPartitioner() instanceof 
BroadcastPartitioner);
+                       } else if (tailName.equals("shuffle")) {
+                               assertTrue(edge.getPartitioner() instanceof 
ShufflePartitioner);
                        }
+
                }
 
+               
assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
+
+               // Test co-location
+
+               JobVertex itSource1 = null;
+               JobVertex itSource2 = null;
+               JobVertex itSink1 = null;
+               JobVertex itSink2 = null;
+
+               for (JobVertex vertex : jg.getVertices()) {
+                       if (vertex.getName().contains("IterationSource")) {
+                               if (vertex.getName().contains("_0")) {
+                                       itSource1 = vertex;
+                               } else if (vertex.getName().contains("_1")) {
+                                       itSource2 = vertex;
+                               }
+                       } else if (vertex.getName().contains("IterationSink")) {
+                               if (vertex.getName().contains("_0")) {
+                                       itSink1 = vertex;
+                               } else if (vertex.getName().contains("_1")) {
+                                       itSink2 = vertex;
+                               }
+                       }
+               }
+
+               assertTrue(itSource1.getCoLocationGroup() != null);
+               assertTrue(itSource2.getCoLocationGroup() != null);
+
+               assertEquals(itSource1.getCoLocationGroup(), 
itSink1.getCoLocationGroup());
+               assertEquals(itSource2.getCoLocationGroup(), 
itSink2.getCoLocationGroup());
+               assertNotEquals(itSource1.getCoLocationGroup(), 
itSource2.getCoLocationGroup());
        }
 
+       @SuppressWarnings("rawtypes")
        @Test
-       public void test() throws Exception {
-               StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+       public void testSimpleIteration() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(PARALLELISM);
                iterated = new boolean[PARALLELISM];
 
-               env = constructIterativeJob(env);
+               DataStream<Boolean> source = env
+                               .fromCollection(Collections.nCopies(PARALLELISM 
* 2, false));
+
+               IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+               DataStream<Boolean> increment = iteration.flatMap(new 
IterationHead()).map(NoOpBoolMap);
+
+               iteration.map(NoOpBoolMap).addSink(new NoOpSink());
+
+               iteration.closeWith(increment).addSink(new NoOpSink());
 
                env.execute();
 
@@ -195,55 +338,135 @@ public class IterateTest {
                }
 
        }
-       
+
        @Test
        public void testCoIteration() throws Exception {
-               StreamExecutionEnvironment env = new TestStreamEnvironment(2, 
MEMORYSIZE);
-               
-               
-               ConnectedIterativeDataStream<Integer, String> coIt =  
env.fromElements(0, 0).iterate(2000).withFeedbackType("String");
-               
-               try{
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(2);
+
+               ConnectedIterativeDataStream<Integer, String> coIt = 
env.fromElements(0, 0).iterate(2000)
+                               .withFeedbackType("String");
+
+               try {
                        coIt.groupBy(1, 2);
                        fail();
-               } catch (UnsupportedOperationException e){}
-               
-               DataStream<String> head = coIt.flatMap(new 
CoFlatMapFunction<Integer, String, String>() {
+               } catch (UnsupportedOperationException e) {
+               }
+
+               DataStream<String> head = coIt
+                               .flatMap(new RichCoFlatMapFunction<Integer, 
String, String>() {
+
+                                       private static final long 
serialVersionUID = 1L;
+                                       boolean seenFromSource = false;
+
+                                       @Override
+                                       public void flatMap1(Integer value, 
Collector<String> out) throws Exception {
+                                               out.collect(((Integer) (value + 
1)).toString());
+                                       }
+
+                                       @Override
+                                       public void flatMap2(String value, 
Collector<String> out) throws Exception {
+                                               Integer intVal = 
Integer.valueOf(value);
+                                               if (intVal < 2) {
+                                                       out.collect(((Integer) 
(intVal + 1)).toString());
+                                               }
+                                               if (intVal == 1000 || intVal == 
2000) {
+                                                       seenFromSource = true;
+                                               }
+                                       }
 
-                       private static final long serialVersionUID = 1L;
+                                       @Override
+                                       public void close() {
+                                               assertTrue(seenFromSource);
+                                       }
+                               });
+
+               coIt.map(new CoMapFunction<Integer, String, String>() {
 
                        @Override
-                       public void flatMap1(Integer value, Collector<String> 
out) throws Exception {
-                               out.collect(((Integer) (value + 1)).toString());
+                       public String map1(Integer value) throws Exception {
+                               return value.toString();
                        }
 
                        @Override
-                       public void flatMap2(String value, Collector<String> 
out) throws Exception {
-                               Integer intVal = Integer.valueOf(value);
-                               if(intVal < 2){
-                                       out.collect(((Integer) (intVal + 
1)).toString());
-                               }
-                               
+                       public String map2(String value) throws Exception {
+                               return value;
                        }
-               });
-               
-               coIt.closeWith(head.broadcast());
-       
+               }).setParallelism(1).addSink(new NoOpSink<String>());
+
+               coIt.closeWith(head.broadcast().union(env.fromElements("1000", 
"2000").rebalance()));
+
                head.addSink(new TestSink()).setParallelism(1);
-               
+
                env.execute();
-               
-               assertEquals(new 
HashSet<String>(Arrays.asList("1","1","2","2","2","2")), TestSink.collected);
+
+               Collections.sort(TestSink.collected);
+               assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), 
TestSink.collected);
+               assertEquals(2, new 
ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0)
+                               .getSourceSinkPairs().size());
 
        }
 
        @Test
+       public void testGroupByFeedback() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(3);
+
+               KeySelector<Integer, Integer> key = new KeySelector<Integer, 
Integer>() {
+
+                       @Override
+                       public Integer getKey(Integer value) throws Exception {
+                               return value % 3;
+                       }
+               };
+
+               DataStream<Integer> source = env.fromElements(1, 2, 3);
+
+               IterativeDataStream<Integer> it = 
source.groupBy(key).iterate(3000);
+
+               DataStream<Integer> head = it.flatMap(new 
RichFlatMapFunction<Integer, Integer>() {
+
+                       int received = 0;
+                       int key = -1;
+
+                       @Override
+                       public void flatMap(Integer value, Collector<Integer> 
out) throws Exception {
+                               received++;
+                               if (key == -1) {
+                                       key = value % 3;
+                               } else {
+                                       assertEquals(key, value % 3);
+                               }
+                               if (value > 0) {
+                                       out.collect(value - 1);
+                               }
+                       }
+
+                       @Override
+                       public void close() {
+                               assertTrue(received > 1);
+                       }
+               });
+
+               
it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)),
+                               true).addSink(new NoOpSink<Integer>());
+
+               env.execute();
+       }
+
+       @SuppressWarnings("deprecation")
+       @Test
        public void testWithCheckPointing() throws Exception {
                StreamExecutionEnvironment env = new 
TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+               env.enableCheckpointing();
 
-               env = constructIterativeJob(env);
+               DataStream<Boolean> source = env
+                               .fromCollection(Collections.nCopies(PARALLELISM 
* 2, false));
+
+               IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+               iteration.closeWith(iteration.flatMap(new IterationHead()));
 
-               env.enableCheckpointing();
                try {
                        env.execute();
 
@@ -252,8 +475,7 @@ public class IterateTest {
                } catch (UnsupportedOperationException e) {
                        // expected behaviour
                }
-               
-               
+
                // Test force checkpointing
 
                try {
@@ -265,22 +487,75 @@ public class IterateTest {
                } catch (UnsupportedOperationException e) {
                        // expected behaviour
                }
-               
+
                env.enableCheckpointing(1, true);
                env.getStreamGraph().getJobGraph();
+       }
+
+       public static final class IterationHead extends 
RichFlatMapFunction<Boolean, Boolean> {
+               public void flatMap(Boolean value, Collector<Boolean> out) 
throws Exception {
+                       int indx = getRuntimeContext().getIndexOfThisSubtask();
+                       if (value) {
+                               iterated[indx] = true;
+                       } else {
+                               out.collect(true);
+                       }
+               }
+       }
+
+       public static final class NoOpSink<T> extends RichSinkFunction<T> {
+               private List<T> received;
+
+               public void invoke(T tuple) {
+                       received.add(tuple);
+               }
+
+               public void open(Configuration conf) {
+                       received = new ArrayList<T>();
+               }
 
+               public void close() {
+                       assertTrue(received.size() > 0);
+               }
        }
-       
-       public static class TestSink implements SinkFunction<String>{
+
+       public static CoMapFunction<Integer, String, String> NoOpCoMap = new 
CoMapFunction<Integer, String, String>() {
+
+               public String map1(Integer value) throws Exception {
+                       return value.toString();
+               }
+
+               public String map2(String value) throws Exception {
+                       return value;
+               }
+       };
+
+       public static MapFunction<Integer, Integer> NoOpIntMap = new 
MapFunction<Integer, Integer>() {
+
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+
+       };
+
+       public static MapFunction<Boolean, Boolean> NoOpBoolMap = new 
MapFunction<Boolean, Boolean>() {
+
+               public Boolean map(Boolean value) throws Exception {
+                       return value;
+               }
+
+       };
+
+       public static class TestSink implements SinkFunction<String> {
 
                private static final long serialVersionUID = 1L;
-               public static Set<String> collected = new HashSet<String>();
-               
+               public static List<String> collected = new ArrayList<String>();
+
                @Override
                public void invoke(String value) throws Exception {
                        collected.add(value);
                }
-               
+
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fbd6502..2b0f60e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -78,6 +78,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Returns the parallelism of this operation.
    */
   def getParallelism = javaStream.getParallelism
+  
+  def getPartitioner = javaStream.getPartitioner
+  
+  def getSelectedNames = javaStream.getSelectedNames
 
   /**
    * Returns the execution config.

Reply via email to