Repository: flink Updated Branches: refs/heads/master 517289dc5 -> b263932e2
[streaming] Refactor iterative datastream for clear self-contained functionality Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b263932e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b263932e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b263932e Branch: refs/heads/master Commit: b263932e2e246824fa5e7538e62f39a53b5e9c17 Parents: 517289d Author: Gyula Fora <[email protected]> Authored: Tue Jan 27 00:17:30 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Tue Jan 27 21:42:04 2015 +0100 ---------------------------------------------------------------------- .../apache/flink/streaming/api/StreamGraph.java | 22 +++---------- .../streaming/api/datastream/DataStream.java | 15 --------- .../api/datastream/IterativeDataStream.java | 34 ++++++++++++++++++-- 3 files changed, 36 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 2a0d0c7..c515e63 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -212,13 +212,13 @@ public class StreamGraph { * Max waiting time for next record */ public void addIterationTail(String vertexName, String iterationTail, Integer iterationID, - int parallelism, long waitTime) { + long waitTime) { if (bufferTimeouts.get(iterationTail) == 0) { throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); } - addVertex(vertexName, StreamIterationTail.class, null, null, parallelism); + addVertex(vertexName, StreamIterationTail.class, null, null, getParallelism(iterationTail)); iterationIds.put(vertexName, iterationID); iterationIDtoTailName.put(iterationID, vertexName); @@ -226,6 +226,9 @@ public class StreamGraph { setSerializersFrom(iterationTail, vertexName); iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime); + setParallelism(iterationIDtoHeadName.get(iterationID), getParallelism(iterationTail)); + setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail)); + if (LOG.isDebugEnabled()) { LOG.debug("ITERATION SINK: {}", vertexName); } @@ -365,21 +368,6 @@ public class StreamGraph { } /** - * Sets the parallelism and buffertimeout of the iteration head of the given - * iteration id to the parallelism given. - * - * @param iterationID - * ID of the iteration - * @param iterationTail - * ID of the iteration tail - */ - public void setIterationSourceSettings(String iterationID, String iterationTail) { - setParallelism(iterationIDtoHeadName.get(iterationID), - operatorParallelisms.get(iterationTail)); - setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail)); - } - - /** * Sets a user defined {@link OutputSelector} for the given operator. Used * for directed emits. * http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index b7ad97e..c2f5d44 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1095,16 +1095,6 @@ public class DataStream<OUT> { return returnStream; } - protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) { - - DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, - null, true); - - streamGraph.addIterationHead(returnStream.getId(), this.getId(), iterationID, - degreeOfParallelism, waitTime); - - return this.copy(); - } /** * Method for passing user defined invokables along with the type @@ -1132,11 +1122,6 @@ public class DataStream<OUT> { connectGraph(inputStream, returnStream.getId(), 0); - if (inputStream instanceof IterativeDataStream) { - IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream; - returnStream.addIterationSource(iterativeStream.iterationID, iterativeStream.waitTime); - } - return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/b263932e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index 6f66b2c..306bfe8 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.invokable.StreamInvokable; + /** * The iterative data stream represents the start of an iteration in a * {@link DataStream}. @@ -61,18 +64,43 @@ public class IterativeDataStream<IN> extends * */ public DataStream<IN> closeWith(DataStream<IN> iterationTail) { - DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null, + DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null, null); + // We add an iteration sink to the tail which will send tuples to the + // iteration head streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID, - iterationTail.getParallelism(), waitTime); + waitTime); - streamGraph.setIterationSourceSettings(iterationID.toString(), iterationTail.getId()); connectGraph(iterationTail.forward(), iterationSink.getId(), 0); return iterationTail; } @Override + public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, + TypeInformation<R> outTypeInfo, StreamInvokable<IN, R> invokable) { + + // We call the superclass tranform method + SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo, + invokable); + + // Then we add a source that will take care of receiving feedback tuples + // from the tail + addIterationSource(returnStream); + + return returnStream; + } + + private <X> void addIterationSource(DataStream<X> dataStream) { + + DataStream<X> iterationSource = new DataStreamSource<X>(environment, "Iteration Source", + null, null, true); + + streamGraph.addIterationHead(iterationSource.getId(), dataStream.getId(), iterationID, + dataStream.getParallelism(), waitTime); + } + + @Override public IterativeDataStream<IN> copy() { return new IterativeDataStream<IN>(this, iterationID, waitTime); }
