Repository: flink
Updated Branches:
  refs/heads/master 517289dc5 -> b263932e2


[streaming] Refactor iterative datastream for clear self-contained functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b263932e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b263932e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b263932e

Branch: refs/heads/master
Commit: b263932e2e246824fa5e7538e62f39a53b5e9c17
Parents: 517289d
Author: Gyula Fora <[email protected]>
Authored: Tue Jan 27 00:17:30 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Tue Jan 27 21:42:04 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamGraph.java | 22 +++----------
 .../streaming/api/datastream/DataStream.java    | 15 ---------
 .../api/datastream/IterativeDataStream.java     | 34 ++++++++++++++++++--
 3 files changed, 36 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 2a0d0c7..c515e63 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -212,13 +212,13 @@ public class StreamGraph {
         *            Max waiting time for next record
         */
        public void addIterationTail(String vertexName, String iterationTail, 
Integer iterationID,
-                       int parallelism, long waitTime) {
+                       long waitTime) {
 
                if (bufferTimeouts.get(iterationTail) == 0) {
                        throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
                }
 
-               addVertex(vertexName, StreamIterationTail.class, null, null, 
parallelism);
+               addVertex(vertexName, StreamIterationTail.class, null, null, 
getParallelism(iterationTail));
 
                iterationIds.put(vertexName, iterationID);
                iterationIDtoTailName.put(iterationID, vertexName);
@@ -226,6 +226,9 @@ public class StreamGraph {
                setSerializersFrom(iterationTail, vertexName);
                iterationTimeouts.put(iterationIDtoTailName.get(iterationID), 
waitTime);
 
+               setParallelism(iterationIDtoHeadName.get(iterationID), 
getParallelism(iterationTail));
+               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeouts.get(iterationTail));
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("ITERATION SINK: {}", vertexName);
                }
@@ -365,21 +368,6 @@ public class StreamGraph {
        }
 
        /**
-        * Sets the parallelism and buffertimeout of the iteration head of the 
given
-        * iteration id to the parallelism given.
-        * 
-        * @param iterationID
-        *            ID of the iteration
-        * @param iterationTail
-        *            ID of the iteration tail
-        */
-       public void setIterationSourceSettings(String iterationID, String 
iterationTail) {
-               setParallelism(iterationIDtoHeadName.get(iterationID),
-                               operatorParallelisms.get(iterationTail));
-               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeouts.get(iterationTail));
-       }
-
-       /**
         * Sets a user defined {@link OutputSelector} for the given operator. 
Used
         * for directed emits.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b7ad97e..c2f5d44 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1095,16 +1095,6 @@ public class DataStream<OUT> {
                return returnStream;
        }
 
-       protected <R> DataStream<OUT> addIterationSource(Integer iterationID, 
long waitTime) {
-
-               DataStream<R> returnStream = new 
DataStreamSource<R>(environment, "iterationSource", null,
-                               null, true);
-
-               streamGraph.addIterationHead(returnStream.getId(), 
this.getId(), iterationID,
-                               degreeOfParallelism, waitTime);
-
-               return this.copy();
-       }
 
        /**
         * Method for passing user defined invokables along with the type
@@ -1132,11 +1122,6 @@ public class DataStream<OUT> {
 
                connectGraph(inputStream, returnStream.getId(), 0);
 
-               if (inputStream instanceof IterativeDataStream) {
-                       IterativeDataStream<OUT> iterativeStream = 
(IterativeDataStream<OUT>) inputStream;
-                       
returnStream.addIterationSource(iterativeStream.iterationID, 
iterativeStream.waitTime);
-               }
-
                return returnStream;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 6f66b2c..306bfe8 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.
@@ -61,18 +64,43 @@ public class IterativeDataStream<IN> extends
         * 
         */
        public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
-               DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "iterationSink", null,
+               DataStream<IN> iterationSink = new 
DataStreamSink<IN>(environment, "Iteration Sink", null,
                                null);
 
+               // We add an iteration sink to the tail which will send tuples 
to the
+               // iteration head
                streamGraph.addIterationTail(iterationSink.getId(), 
iterationTail.getId(), iterationID,
-                               iterationTail.getParallelism(), waitTime);
+                               waitTime);
 
-               streamGraph.setIterationSourceSettings(iterationID.toString(), 
iterationTail.getId());
                connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
                return iterationTail;
        }
 
        @Override
+       public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName,
+                       TypeInformation<R> outTypeInfo, StreamInvokable<IN, R> 
invokable) {
+
+               // We call the superclass tranform method
+               SingleOutputStreamOperator<R, ?> returnStream = 
super.transform(operatorName, outTypeInfo,
+                               invokable);
+
+               // Then we add a source that will take care of receiving 
feedback tuples
+               // from the tail
+               addIterationSource(returnStream);
+
+               return returnStream;
+       }
+
+       private <X> void addIterationSource(DataStream<X> dataStream) {
+
+               DataStream<X> iterationSource = new 
DataStreamSource<X>(environment, "Iteration Source",
+                               null, null, true);
+
+               streamGraph.addIterationHead(iterationSource.getId(), 
dataStream.getId(), iterationID,
+                               dataStream.getParallelism(), waitTime);
+       }
+
+       @Override
        public IterativeDataStream<IN> copy() {
                return new IterativeDataStream<IN>(this, iterationID, waitTime);
        }

Reply via email to