Repository: flink Updated Branches: refs/heads/master ea4f339d7 -> 3b69b2499
[FLINK-2335] [streaming] Lazy iteration construction in StreamGraph Closes #900 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b69b249 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b69b249 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b69b249 Branch: refs/heads/master Commit: 3b69b249991c23995dddc3b5182415f5c7df332a Parents: ea4f339 Author: Gyula Fora <[email protected]> Authored: Fri Jul 10 20:03:22 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Sat Jul 11 14:23:59 2015 +0200 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 48 +- .../api/datastream/IterativeDataStream.java | 43 +- .../api/datastream/SplitDataStream.java | 4 +- .../flink/streaming/api/graph/StreamConfig.java | 8 +- .../flink/streaming/api/graph/StreamEdge.java | 6 +- .../flink/streaming/api/graph/StreamGraph.java | 275 ++++++---- .../flink/streaming/api/graph/StreamLoop.java | 122 +++++ .../api/graph/StreamingJobGraphGenerator.java | 27 +- .../partitioner/RebalancePartitioner.java | 5 + .../runtime/partitioner/StreamPartitioner.java | 5 + .../runtime/tasks/StreamIterationHead.java | 4 +- .../runtime/tasks/StreamIterationTail.java | 4 +- .../apache/flink/streaming/api/IterateTest.java | 519 ++++++++++++++----- .../flink/streaming/api/scala/DataStream.scala | 4 + 14 files changed, 804 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index c9c1f49..7896169 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -95,11 +95,11 @@ public class DataStream<OUT> { protected final StreamExecutionEnvironment environment; protected final Integer id; protected int parallelism; - protected List<String> userDefinedNames; + protected List<String> selectedNames; protected StreamPartitioner<OUT> partitioner; @SuppressWarnings("rawtypes") protected TypeInformation typeInfo; - protected List<DataStream<OUT>> unionizedStreams; + protected List<DataStream<OUT>> unionedStreams; protected Integer iterationID = null; protected Long iterationWaitTime = null; @@ -126,11 +126,11 @@ public class DataStream<OUT> { this.environment = environment; this.parallelism = environment.getParallelism(); this.streamGraph = environment.getStreamGraph(); - this.userDefinedNames = new ArrayList<String>(); + this.selectedNames = new ArrayList<String>(); this.partitioner = new RebalancePartitioner<OUT>(true); this.typeInfo = typeInfo; - this.unionizedStreams = new ArrayList<DataStream<OUT>>(); - this.unionizedStreams.add(this); + this.unionedStreams = new ArrayList<DataStream<OUT>>(); + this.unionedStreams.add(this); } /** @@ -143,17 +143,17 @@ public class DataStream<OUT> { this.environment = dataStream.environment; this.id = dataStream.id; this.parallelism = dataStream.parallelism; - this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames); + this.selectedNames = new ArrayList<String>(dataStream.selectedNames); this.partitioner = dataStream.partitioner.copy(); this.streamGraph = dataStream.streamGraph; this.typeInfo = dataStream.typeInfo; this.iterationID = dataStream.iterationID; this.iterationWaitTime = dataStream.iterationWaitTime; - this.unionizedStreams = new ArrayList<DataStream<OUT>>(); - this.unionizedStreams.add(this); - if (dataStream.unionizedStreams.size() > 1) { - for (int i = 1; i < dataStream.unionizedStreams.size(); i++) { - this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i))); + this.unionedStreams = new ArrayList<DataStream<OUT>>(); + this.unionedStreams.add(this); + if (dataStream.unionedStreams.size() > 1) { + for (int i = 1; i < dataStream.unionedStreams.size(); i++) { + this.unionedStreams.add(new DataStream<OUT>(dataStream.unionedStreams.get(i))); } } @@ -176,6 +176,14 @@ public class DataStream<OUT> { public int getParallelism() { return this.parallelism; } + + public StreamPartitioner<OUT> getPartitioner() { + return this.partitioner; + } + + public List<String> getSelectedNames(){ + return selectedNames; + } /** * Gets the type of the stream. @@ -248,9 +256,9 @@ public class DataStream<OUT> { DataStream<OUT> returnStream = this.copy(); for (DataStream<OUT> stream : streams) { - for (DataStream<OUT> ds : stream.unionizedStreams) { + for (DataStream<OUT> ds : stream.unionedStreams) { validateUnion(ds.getId()); - returnStream.unionizedStreams.add(ds.copy()); + returnStream.unionedStreams.add(ds.copy()); } } return returnStream; @@ -268,7 +276,7 @@ public class DataStream<OUT> { * @return The {@link SplitDataStream} */ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) { - for (DataStream<OUT> ds : this.unionizedStreams) { + for (DataStream<OUT> ds : this.unionedStreams) { streamGraph.addOutputSelector(ds.getId(), clean(outputSelector)); } @@ -1103,9 +1111,7 @@ public class DataStream<OUT> { } protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<?> feedbackType) { - Integer id = ++counter; - streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime, feedbackType); - streamGraph.setParallelism(id, dataStream.getParallelism()); + streamGraph.addIterationHead(dataStream.getId(), iterationID, iterationWaitTime, feedbackType); } /** @@ -1118,7 +1124,7 @@ public class DataStream<OUT> { protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) { DataStream<OUT> returnStream = this.copy(); - for (DataStream<OUT> stream : returnStream.unionizedStreams) { + for (DataStream<OUT> stream : returnStream.unionedStreams) { stream.partitioner = partitioner; } @@ -1139,9 +1145,9 @@ public class DataStream<OUT> { * Number of the type (used at co-functions) */ protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) { - for (DataStream<X> stream : inputStream.unionizedStreams) { + for (DataStream<X> stream : inputStream.unionedStreams) { streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber, - inputStream.userDefinedNames); + inputStream.selectedNames); } } @@ -1170,7 +1176,7 @@ public class DataStream<OUT> { } private void validateUnion(Integer id) { - for (DataStream<OUT> ds : this.unionizedStreams) { + for (DataStream<OUT> ds : this.unionedStreams) { if (ds.getId().equals(id)) { throw new RuntimeException("A DataStream cannot be merged with itself"); } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index da3d885..4de368c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.api.datastream; +import java.util.List; + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -32,6 +34,8 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; */ public class IterativeDataStream<IN> extends SingleOutputStreamOperator<IN, IterativeDataStream<IN>> { + + protected boolean closed = false; static Integer iterationCount = 0; @@ -60,20 +64,18 @@ public class IterativeDataStream<IN> extends * @return The feedback stream. * */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) { - DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null, - null); - - // We add an iteration sink to the tail which will send tuples to the - // iteration head - streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID, - iterationWaitTime); - - if (keepPartitioning) { - connectGraph(iterationTail, iterationSink.getId(), 0); - } else { - connectGraph(iterationTail.forward(), iterationSink.getId(), 0); + + if (closed) { + throw new IllegalStateException( + "An iterative data stream can only be closed once. Use union to close with multiple stream."); } + closed = true; + + streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID, + keepPartitioning); + return iterationTail; } @@ -138,7 +140,8 @@ public class IterativeDataStream<IN> extends * @return A {@link ConnectedIterativeDataStream}. */ public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) { - return new ConnectedIterativeDataStream<IN, F>(this, feedbackType); + return new ConnectedIterativeDataStream<IN, F>(new IterativeDataStream<IN>(this, + iterationWaitTime), feedbackType); } /** @@ -201,14 +204,16 @@ public class IterativeDataStream<IN> extends * @return The feedback stream. * */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public DataStream<F> closeWith(DataStream<F> feedbackStream) { - DataStream<F> iterationSink = new DataStreamSink<F>(input.environment, "Iteration Sink", - null, null); + if (input.closed) { + throw new IllegalStateException( + "An iterative data stream can only be closed once. Use union to close with multiple stream."); + } + input.closed = true; - input.streamGraph.addIterationTail(iterationSink.getId(), feedbackStream.getId(), input.iterationID, - input.iterationWaitTime); - - input.connectGraph(feedbackStream, iterationSink.getId(), 0); + input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams, + input.iterationID, true); return feedbackStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java index 36a94c7..6b95fe7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java @@ -57,8 +57,8 @@ public class SplitDataStream<OUT> extends DataStream<OUT> { DataStream<OUT> returnStream = copy(); - for (DataStream<OUT> ds : returnStream.unionizedStreams) { - ds.userDefinedNames = Arrays.asList(outputNames); + for (DataStream<OUT> ds : returnStream.unionedStreams) { + ds.selectedNames = Arrays.asList(outputNames); } return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 0784582..6a44104 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -206,12 +206,12 @@ public class StreamConfig implements Serializable { } } - public void setIterationId(Integer iterationId) { - config.setInteger(ITERATION_ID, iterationId); + public void setIterationId(String iterationId) { + config.setString(ITERATION_ID, iterationId); } - public Integer getIterationId() { - return config.getInteger(ITERATION_ID, 0); + public String getIterationId() { + return config.getString(ITERATION_ID, ""); } public void setIterationWaitTime(long time) { http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 293f5e0..47d97df 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -46,7 +46,7 @@ public class StreamEdge implements Serializable { * output selection). */ final private List<String> selectedNames; - final private StreamPartitioner<?> outputPartitioner; + private StreamPartitioner<?> outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, List<String> selectedNames, StreamPartitioner<?> outputPartitioner) { @@ -87,6 +87,10 @@ public class StreamEdge implements Serializable { public StreamPartitioner<?> getPartitioner() { return outputPartitioner; } + + public void setPartitioner(StreamPartitioner<?> partitioner) { + this.outputPartitioner = partitioner; + } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index cae24be..64c349e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.flink.api.common.ExecutionConfig; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.state.StateHandleProvider; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; @@ -79,6 +81,7 @@ public class StreamGraph extends StreamingPlan { private Map<Integer, StreamLoop> streamLoops; protected Map<Integer, StreamLoop> vertexIDtoLoop; + protected Map<Integer, String> vertexIDtoBrokerID; private StateHandleProvider<?> stateHandleProvider; private boolean forceCheckpoint = false; @@ -97,7 +100,8 @@ public class StreamGraph extends StreamingPlan { public void clear() { streamNodes = new HashMap<Integer, StreamNode>(); streamLoops = new HashMap<Integer, StreamLoop>(); - vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>(); + vertexIDtoLoop = new HashMap<Integer, StreamLoop>(); + vertexIDtoBrokerID = new HashMap<Integer, String>(); sources = new HashSet<Integer>(); } @@ -120,9 +124,9 @@ public class StreamGraph extends StreamingPlan { public void setCheckpointingInterval(long checkpointingInterval) { this.checkpointingInterval = checkpointingInterval; } - + public void forceCheckpoint() { - this.forceCheckpoint = true; + this.forceCheckpoint = true; } public void setStateHandleProvider(StateHandleProvider<?> provider) { @@ -179,8 +183,9 @@ public class StreamGraph extends StreamingPlan { } public <IN1, IN2, OUT> void addCoOperator(Integer vertexID, - TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo, - TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { + TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, + TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, + TypeInformation<OUT> outTypeInfo, String operatorName) { addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName); @@ -196,59 +201,192 @@ public class StreamGraph extends StreamingPlan { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID, - long timeOut, TypeInformation<?> feedbackType) { + public void addIterationHead(Integer iterationHead, Integer iterationID, long timeOut, + TypeInformation<?> feedbackType) { + // If there is no loop object created for this iteration create one + StreamLoop loop = streamLoops.get(iterationID); + if (loop == null) { + loop = new StreamLoop(iterationID, timeOut, feedbackType); + streamLoops.put(iterationID, loop); + } - StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null); + loop.addHeadOperator(getStreamNode(iterationHead)); + } - StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut); - streamLoops.put(iterationID, iteration); - vertexIDtoLoop.put(sourceID, iteration); + public void addIterationTail(List<DataStream<?>> feedbackStreams, Integer iterationID, + boolean keepPartitioning) { - itSource.setOperatorName("IterationSource-" + sourceID); - itSource.setParallelism(getStreamNode(iterationHead).getParallelism()); - - if(feedbackType == null){ - setSerializersFrom(iterationHead, sourceID); - addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>()); - }else{ - itSource.setSerializerOut(new StreamRecordSerializer(feedbackType, executionConfig)); - addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 2, new ArrayList<String>()); + if (!streamLoops.containsKey(iterationID)) { + throw new RuntimeException("Cannot close iteration without head operator."); } - - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SOURCE: {}", sourceID); + StreamLoop loop = streamLoops.get(iterationID); + + for (DataStream<?> stream : feedbackStreams) { + loop.addTailOperator(getStreamNode(stream.getId()), stream.getPartitioner(), + stream.getSelectedNames()); } - sources.add(sourceID); + if (keepPartitioning) { + loop.applyTailPartitioning(); + } } - public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iterationID, - long waitTime) { + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void finalizeLoops() { + + // We create each loop separately, the order does not matter as sinks + // and sources don't interact + for (StreamLoop loop : streamLoops.values()) { + + // We make sure not to re-create the loops if the method is called + // multiple times + if (loop.getSourceSinkPairs().isEmpty()) { + + List<StreamNode> headOps = loop.getHeads(); + List<StreamNode> tailOps = loop.getTails(); + + // This means that the iteration was not closed. It should not + // be + // allowed. + if (tailOps.isEmpty()) { + throw new RuntimeException("Cannot execute job with empty iterations."); + } + + // Check whether we keep the feedback partitioning + if (loop.keepsPartitioning()) { + // This is the complicated case as we need to enforce + // partitioning on the tail -> sink side, which + // requires strict forward connections at source -> head + + // We need one source/sink pair per different head + // parallelism + // as we depend on strict forwards connections + Map<Integer, List<StreamNode>> parallelismToHeads = new HashMap<Integer, List<StreamNode>>(); + + // Group head operators by parallelism + for (StreamNode head : headOps) { + int p = head.getParallelism(); + if (!parallelismToHeads.containsKey(p)) { + parallelismToHeads.put(p, new ArrayList<StreamNode>()); + } + parallelismToHeads.get(p).add(head); + } + + // We create the sink/source pair for each parallelism + // group, + // tails will forward to all sinks but each head operator + // will + // only receive from one source (corresponding to its + // parallelism) + int c = 0; + for (Entry<Integer, List<StreamNode>> headGroup : parallelismToHeads.entrySet()) { + List<StreamNode> headOpsInGroup = headGroup.getValue(); + + Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, + c); + StreamNode source = sourceSinkPair.f0; + StreamNode sink = sourceSinkPair.f1; + + // We connect the source to the heads in this group + // (forward), setting + // type to 2 in case we have a coIteration (this sets + // the + // input as the second input of the co-operator) + for (StreamNode head : headOpsInGroup) { + int inputType = loop.isCoIteration() ? 2 : 0; + addEdge(source.getId(), head.getId(), new RebalancePartitioner(true), + inputType, new ArrayList<String>()); + } + + // We connect all the tails to the sink keeping the + // partitioner + for (int i = 0; i < tailOps.size(); i++) { + StreamNode tail = tailOps.get(i); + StreamPartitioner<?> partitioner = loop.getTailPartitioners().get(i); + addEdge(tail.getId(), sink.getId(), partitioner.copy(), 0, loop + .getTailSelectedNames().get(i)); + } + + // We set the sink/source parallelism to the group + // parallelism + source.setParallelism(headGroup.getKey()); + sink.setParallelism(source.getParallelism()); + + // We set the proper serializers for the sink/source + setSerializersFrom(tailOps.get(0).getId(), sink.getId()); + if (loop.isCoIteration()) { + source.setSerializerOut(new StreamRecordSerializer(loop + .getFeedbackType(), executionConfig)); + } else { + setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId()); + } + + c++; + } + + } else { + // This is the most simple case, we add one iteration + // sink/source pair with the parallelism of the first tail + // operator. Tail operators will forward the records and + // partitioning will be enforced from source -> head + + Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, 0); + StreamNode source = sourceSinkPair.f0; + StreamNode sink = sourceSinkPair.f1; + + // We get the feedback partitioner from the first input of + // the + // first head. + StreamPartitioner<?> partitioner = headOps.get(0).getInEdges().get(0) + .getPartitioner(); + + // Connect the sources to heads using this partitioner + for (StreamNode head : headOps) { + addEdge(source.getId(), head.getId(), partitioner.copy(), 0, + new ArrayList<String>()); + } + + // The tails are connected to the sink with forward + // partitioning + for (int i = 0; i < tailOps.size(); i++) { + StreamNode tail = tailOps.get(i); + addEdge(tail.getId(), sink.getId(), new RebalancePartitioner(true), 0, loop + .getTailSelectedNames().get(i)); + } + + // We set the parallelism to match the first tail op to make + // the + // forward more efficient + sink.setParallelism(tailOps.get(0).getParallelism()); + source.setParallelism(sink.getParallelism()); + + // We set the proper serializers + setSerializersFrom(headOps.get(0).getId(), source.getId()); + setSerializersFrom(tailOps.get(0).getId(), sink.getId()); + } - if (getStreamNode(iterationTail).getBufferTimeout() == 0) { - throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); - } + } - StreamNode itSink = addNode(sinkID, StreamIterationTail.class, null, null); + } - StreamLoop iteration = streamLoops.get(iterationID); - iteration.setSink(getStreamNode(sinkID)); - vertexIDtoLoop.put(sinkID, iteration); - - itSink.setParallelism(iteration.getSource().getParallelism()); + } - setSerializersFrom(iterationTail, sinkID); - getStreamNode(sinkID).setOperatorName("IterationSink-" + sinkID); + private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop loop, int c) { + StreamNode source = addNode(-1 * streamNodes.size(), StreamIterationHead.class, null, null); + sources.add(source.getId()); - setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail).getBufferTimeout()); + StreamNode sink = addNode(-1 * streamNodes.size(), StreamIterationTail.class, null, null); - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SINK: {}", sinkID); - } + source.setOperatorName("IterationSource-" + loop.getID() + "_" + c); + sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c); + vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c); + vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c); + vertexIDtoLoop.put(source.getId(), loop); + vertexIDtoLoop.put(sink.getId(), loop); + loop.addSourceSinkPair(source, sink); + return new Tuple2<StreamNode, StreamNode>(source, sink); } protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass, @@ -284,7 +422,7 @@ public class StreamGraph extends StreamingPlan { getStreamNode(vertexID).setParallelism(parallelism); } - public void setKey(Integer vertexID, KeySelector<?,?> key) { + public void setKey(Integer vertexID, KeySelector<?, ?> key) { getStreamNode(vertexID).setStatePartitioner(key); } @@ -382,6 +520,10 @@ public class StreamGraph extends StreamingPlan { return vertexIDtoLoop.get(vertexID).getID(); } + public String getBrokerID(Integer vertexID) { + return vertexIDtoBrokerID.get(vertexID); + } + public long getLoopTimeout(Integer vertexID) { return vertexIDtoLoop.get(vertexID).getTimeout(); } @@ -421,13 +563,13 @@ public class StreamGraph extends StreamingPlan { * name of the jobGraph */ public JobGraph getJobGraph(String jobGraphName) { - + finalizeLoops(); // temporarily forbid checkpointing for iterative jobs if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) { throw new UnsupportedOperationException( "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " - + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " - + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); + + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } setJobName(jobGraphName); @@ -474,45 +616,4 @@ public class StreamGraph extends StreamingPlan { DEFAULT, ISOLATE, NEWGROUP } - /** - * Object for representing loops in streaming programs. - * - */ - public static class StreamLoop { - - private Integer loopID; - - private StreamNode source; - private StreamNode sink; - - private Long timeout; - - public StreamLoop(Integer loopID, StreamNode source, Long timeout) { - this.loopID = loopID; - this.source = source; - this.timeout = timeout; - } - - public Integer getID() { - return loopID; - } - - public Long getTimeout() { - return timeout; - } - - public void setSink(StreamNode sink) { - this.sink = sink; - } - - public StreamNode getSource() { - return source; - } - - public StreamNode getSink() { - return sink; - } - - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java new file mode 100644 index 0000000..ba987ef --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; + +/** + * Object for representing loops in streaming programs. + * + */ +public class StreamLoop { + + private Integer loopID; + + private List<StreamNode> headOperators = new ArrayList<StreamNode>(); + private List<StreamNode> tailOperators = new ArrayList<StreamNode>(); + private List<StreamPartitioner<?>> tailPartitioners = new ArrayList<StreamPartitioner<?>>(); + private List<List<String>> tailSelectedNames = new ArrayList<List<String>>(); + + private boolean coIteration = false; + private TypeInformation<?> feedbackType = null; + + private long timeout; + private boolean tailPartitioning = false; + + private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new ArrayList<Tuple2<StreamNode, StreamNode>>(); + + public StreamLoop(Integer loopID, long timeout, TypeInformation<?> feedbackType) { + this.loopID = loopID; + this.timeout = timeout; + if (feedbackType != null) { + this.feedbackType = feedbackType; + coIteration = true; + tailPartitioning = true; + } + } + + public Integer getID() { + return loopID; + } + + public long getTimeout() { + return timeout; + } + + public boolean isCoIteration() { + return coIteration; + } + + public TypeInformation<?> getFeedbackType() { + return feedbackType; + } + + public void addSourceSinkPair(StreamNode source, StreamNode sink) { + this.sourcesAndSinks.add(new Tuple2<StreamNode, StreamNode>(source, sink)); + } + + public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() { + return this.sourcesAndSinks; + } + + public void addHeadOperator(StreamNode head) { + this.headOperators.add(head); + } + + public void addTailOperator(StreamNode tail, StreamPartitioner<?> partitioner, + List<String> selectedNames) { + this.tailOperators.add(tail); + this.tailPartitioners.add(partitioner); + this.tailSelectedNames.add(selectedNames); + } + + public void applyTailPartitioning() { + this.tailPartitioning = true; + } + + public boolean keepsPartitioning() { + return tailPartitioning; + } + + public List<StreamNode> getHeads() { + return headOperators; + } + + public List<StreamNode> getTails() { + return tailOperators; + } + + public List<StreamPartitioner<?>> getTailPartitioners() { + return tailPartitioners; + } + + public List<List<String>> getTailSelectedNames() { + return tailSelectedNames; + } + + @Override + public String toString() { + return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" + "Tail: " + tailOperators + + "\n" + "TP: " + tailPartitioners + "\n" + "TSN: " + tailSelectedNames; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index eb34e3f..4d541bc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -30,17 +30,17 @@ import java.util.Map.Entry; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -250,6 +250,7 @@ public class StreamingJobGraphGenerator { return retConfig; } + @SuppressWarnings("unchecked") private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) { @@ -276,7 +277,7 @@ public class StreamingJobGraphGenerator { if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(streamGraph.getLoopID(vertexID)); + config.setIterationId(streamGraph.getBrokerID(vertexID)); config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID)); } @@ -360,13 +361,19 @@ public class StreamingJobGraphGenerator { } for (StreamLoop loop : streamGraph.getStreamLoops()) { - CoLocationGroup ccg = new CoLocationGroup(); - JobVertex tail = jobVertices.get(loop.getSink().getId()); - JobVertex head = jobVertices.get(loop.getSource().getId()); - ccg.addVertex(head); - ccg.addVertex(tail); - tail.updateCoLocationGroup(ccg); - head.updateCoLocationGroup(ccg); + for (Tuple2<StreamNode, StreamNode> pair : loop.getSourceSinkPairs()) { + + CoLocationGroup ccg = new CoLocationGroup(); + + JobVertex source = jobVertices.get(pair.f0.getId()); + JobVertex sink = jobVertices.get(pair.f1.getId()); + + ccg.addVertex(source); + ccg.addVertex(sink); + source.updateCoLocationGroup(ccg); + sink.updateCoLocationGroup(ccg); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java index 70d9c6b..e6ad821 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java @@ -49,4 +49,9 @@ public class RebalancePartitioner<T> extends StreamPartitioner<T> { public StreamPartitioner<T> copy() { return new RebalancePartitioner<T>(forward); } + + @Override + public String toString() { + return forward ? "ForwardPartitioner" : "RebalancePartitioner"; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java index ef598c6..b37655b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java @@ -45,4 +45,9 @@ public abstract class StreamPartitioner<T> implements public StreamPartitioner<T> copy() { return this; } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 4952cdf..25fe83d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -48,12 +48,12 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { super.registerInputOutput(); outputHandler = new OutputHandler<OUT>(this); - Integer iterationId = configuration.getIterationId(); + String iterationId = configuration.getIterationId(); iterationWaitTime = configuration.getIterationWaitTime(); shouldWait = iterationWaitTime > 0; try { - BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" + BlockingQueueBroker.instance().handIn(iterationId+"-" +getEnvironment().getIndexInSubtaskGroup(), dataChannel); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 5bbae06..b6e3889 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -30,7 +30,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class); - private Integer iterationId; + private String iterationId; @SuppressWarnings("rawtypes") private BlockingQueue<StreamRecord> dataChannel; @@ -47,7 +47,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> { iterationId = configuration.getIterationId(); iterationWaitTime = configuration.getIterationWaitTime(); shouldWait = iterationWaitTime > 0; - dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-" + dataChannel = BlockingQueueBroker.instance().get(iterationId+"-" +getEnvironment().getIndexInSubtaskGroup()); } catch (Exception e) { throw new StreamTaskException(String.format( http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index 3021abb..2a88a32 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -18,175 +18,318 @@ package org.apache.flink.streaming.api; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.List; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream; import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream; +import org.apache.flink.streaming.api.datastream.SplitDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop; +import org.apache.flink.streaming.api.graph.StreamLoop; +import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.Collector; import org.junit.Test; -public class IterateTest { +@SuppressWarnings({ "unchecked", "unused", "serial" }) +public class IterateTest extends StreamingMultipleProgramsTestBase { private static final long MEMORYSIZE = 32; private static boolean iterated[]; private static int PARALLELISM = 2; - public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> { - - private static final long serialVersionUID = 1L; + @Test + public void testException() throws Exception { - @Override - public void flatMap(Boolean value, Collector<Boolean> out) throws Exception { - int indx = getRuntimeContext().getIndexOfThisSubtask(); - if (value) { - iterated[indx] = true; - } else { - out.collect(value); - } + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + DataStream<Integer> source = env.fromElements(1, 10); + IterativeDataStream<Integer> iter1 = source.iterate(); + IterativeDataStream<Integer> iter2 = source.iterate(); + iter1.closeWith(iter1.map(NoOpIntMap)); + // Check for double closing + try { + iter1.closeWith(iter1.map(NoOpIntMap)); + fail(); + } catch (Exception e) { } - } + // Check for closing iteration without head + try { + iter2.closeWith(iter1.map(NoOpIntMap)); + fail(); + } catch (Exception e) { + } - public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> { + iter2.map(NoOpIntMap); - private static final long serialVersionUID = 1L; + // Check for executing with empty iteration + try { + env.execute(); + fail(); + } catch (Exception e) { + } + } - @Override - public void flatMap(Boolean value, Collector<Boolean> out) throws Exception { - out.collect(true); + @Test + public void testImmutabilityWithCoiteration() { + StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + DataStream<Integer> source = env.fromElements(1, 10); - } + IterativeDataStream<Integer> iter1 = source.iterate(); + // Calling withFeedbackType should create a new iteration + ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class); - } + iter1.closeWith(iter1.map(NoOpIntMap)); + iter2.closeWith(iter2.map(NoOpCoMap)); - public static final class MySink implements SinkFunction<Boolean> { + StreamGraph graph = env.getStreamGraph(); - private static final long serialVersionUID = 1L; + graph.getJobGraph(); - @Override - public void invoke(Boolean tuple) { + assertEquals(2, graph.getStreamLoops().size()); + for (StreamLoop loop : graph.getStreamLoops()) { + assertEquals(loop.getHeads(), loop.getTails()); + List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = loop.getSourceSinkPairs(); + assertEquals(1, sourceSinkPairs.size()); } } - public static final class NoOpMap implements MapFunction<Boolean, Boolean> { + @Test + public void testmultipleHeadsTailsSimple() { + StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE); + DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle(); + DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5); - private static final long serialVersionUID = 1L; + IterativeDataStream<Integer> iter1 = source1.union(source2).iterate(); - @Override - public Boolean map(Boolean value) throws Exception { - return value; - } + DataStream<Integer> head1 = iter1.map(NoOpIntMap); + DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2); + DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2) + .addSink(new NoOpSink<Integer>()); + DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>()); - } + SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).split( + new OutputSelector<Integer>() { - public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env) { - env.setBufferTimeout(10); + @Override + public Iterable<String> select(Integer value) { + return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd"); + } + }); - DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false)); + iter1.closeWith(source3.select("even").union( + head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle())); - IterativeDataStream<Boolean> iteration = source.iterate(3000); + StreamGraph graph = env.getStreamGraph(); - DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap( - new IterationTail()); + JobGraph jg = graph.getJobGraph(); - iteration.closeWith(increment).addSink(new MySink()); - return env; - } + assertEquals(1, graph.getStreamLoops().size()); + StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0); - @Test - public void testColocation() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE); + assertEquals(4, loop.getHeads().size()); + assertEquals(3, loop.getTails().size()); - IterativeDataStream<Boolean> it = env.fromElements(true).rebalance().map(new NoOpMap()) - .iterate(); + assertEquals(1, loop.getSourceSinkPairs().size()); + Tuple2<StreamNode, StreamNode> pair = loop.getSourceSinkPairs().get(0); - DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator"); + assertEquals(pair.f0.getParallelism(), pair.f1.getParallelism()); + assertEquals(4, pair.f0.getOutEdges().size()); + assertEquals(3, pair.f1.getInEdges().size()); - it.closeWith(head.map(new NoOpMap()).setParallelism(3).name("TailOperator")).print(); + for (StreamEdge edge : pair.f0.getOutEdges()) { + assertTrue(edge.getPartitioner() instanceof ShufflePartitioner); + } + for (StreamEdge edge : pair.f1.getInEdges()) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + } - JobGraph graph = env.getStreamGraph().getJobGraph(); + assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even"))); - JobVertex itSource = null; - JobVertex itSink = null; - JobVertex headOp = null; - JobVertex tailOp = null; + // Test co-location - for (JobVertex vertex : graph.getVertices()) { + JobVertex itSource1 = null; + JobVertex itSink1 = null; + + for (JobVertex vertex : jg.getVertices()) { if (vertex.getName().contains("IterationSource")) { - itSource = vertex; + itSource1 = vertex; } else if (vertex.getName().contains("IterationSink")) { - itSink = vertex; - } else if (vertex.getName().contains("HeadOperator")) { - headOp = vertex; - } else if (vertex.getName().contains("TailOp")) { - tailOp = vertex; + + itSink1 = vertex; + } } - assertTrue(itSource.getCoLocationGroup() != null); - assertEquals(itSource.getCoLocationGroup(), itSink.getCoLocationGroup()); - assertEquals(headOp.getParallelism(), 2); - assertEquals(tailOp.getParallelism(), 3); - assertEquals(itSource.getParallelism(), itSink.getParallelism()); + assertTrue(itSource1.getCoLocationGroup() != null); + assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup()); } - @SuppressWarnings("unchecked") @Test - public void testPartitioning() throws Exception { + public void testmultipleHeadsTailsWithTailPartitioning() { StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE); + DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle(); + DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5); - IterativeDataStream<Boolean> it = env.fromElements(true).iterate(); + IterativeDataStream<Integer> iter1 = source1.union(source2).iterate(); - IterativeDataStream<Boolean> it2 = env.fromElements(true).iterate(); + DataStream<Integer> head1 = iter1.map(NoOpIntMap); + DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2).name("shuffle"); + DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2) + .addSink(new NoOpSink<Integer>()); + DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>()); - DataStream<Boolean> head = it.map(new NoOpMap()).name("Head1").broadcast(); - DataStream<Boolean> head2 = it2.map(new NoOpMap()).name("Head2").broadcast(); + SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).name("split") + .split(new OutputSelector<Integer>() { - it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true); - it2.closeWith(head2, false); + @Override + public Iterable<String> select(Integer value) { + return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd"); + } + }); + + iter1.closeWith( + source3.select("even").union( + head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"), + head2.shuffle()), true); StreamGraph graph = env.getStreamGraph(); - for (StreamLoop loop : graph.getStreamLoops()) { - StreamEdge tailToSink = loop.getSink().getInEdges().get(0); - if (tailToSink.getSourceVertex().getOperatorName().contains("Head1")) { - assertTrue(tailToSink.getPartitioner() instanceof BroadcastPartitioner); - assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof ShufflePartitioner); - } else { - assertTrue(tailToSink.getPartitioner() instanceof RebalancePartitioner); + JobGraph jg = graph.getJobGraph(); + + assertEquals(1, graph.getStreamLoops().size()); + + StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0); + + assertEquals(4, loop.getHeads().size()); + assertEquals(3, loop.getTails().size()); + + assertEquals(2, loop.getSourceSinkPairs().size()); + List<Tuple2<StreamNode, StreamNode>> pairs = loop.getSourceSinkPairs(); + Tuple2<StreamNode, StreamNode> pair1 = pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0) + : pairs.get(1); + Tuple2<StreamNode, StreamNode> pair2 = pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0) + : pairs.get(1); + + assertEquals(pair1.f0.getParallelism(), pair1.f1.getParallelism()); + assertEquals(2, pair1.f0.getParallelism()); + assertEquals(2, pair1.f0.getOutEdges().size()); + assertEquals(3, pair1.f1.getInEdges().size()); + + for (StreamEdge edge : pair1.f0.getOutEdges()) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + assertEquals(2, edge.getTargetVertex().getParallelism()); + } + for (StreamEdge edge : pair1.f1.getInEdges()) { + String tailName = edge.getSourceVertex().getOperatorName(); + if (tailName.equals("split")) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + } else if (tailName.equals("bc")) { + assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner); + } else if (tailName.equals("shuffle")) { + assertTrue(edge.getPartitioner() instanceof ShufflePartitioner); + } + + } + + assertEquals(pair2.f0.getParallelism(), pair2.f1.getParallelism()); + assertEquals(4, pair2.f0.getParallelism()); + assertEquals(2, pair2.f0.getOutEdges().size()); + assertEquals(3, pair2.f1.getInEdges().size()); + + for (StreamEdge edge : pair2.f0.getOutEdges()) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + assertEquals(4, edge.getTargetVertex().getParallelism()); + } + for (StreamEdge edge : pair2.f1.getInEdges()) { + String tailName = edge.getSourceVertex().getOperatorName(); + if (tailName.equals("split")) { + assertTrue(edge.getPartitioner() instanceof RebalancePartitioner); + } else if (tailName.equals("bc")) { + assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner); + } else if (tailName.equals("shuffle")) { + assertTrue(edge.getPartitioner() instanceof ShufflePartitioner); } + } + assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even"))); + + // Test co-location + + JobVertex itSource1 = null; + JobVertex itSource2 = null; + JobVertex itSink1 = null; + JobVertex itSink2 = null; + + for (JobVertex vertex : jg.getVertices()) { + if (vertex.getName().contains("IterationSource")) { + if (vertex.getName().contains("_0")) { + itSource1 = vertex; + } else if (vertex.getName().contains("_1")) { + itSource2 = vertex; + } + } else if (vertex.getName().contains("IterationSink")) { + if (vertex.getName().contains("_0")) { + itSink1 = vertex; + } else if (vertex.getName().contains("_1")) { + itSink2 = vertex; + } + } + } + + assertTrue(itSource1.getCoLocationGroup() != null); + assertTrue(itSource2.getCoLocationGroup() != null); + + assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup()); + assertEquals(itSource2.getCoLocationGroup(), itSink2.getCoLocationGroup()); + assertNotEquals(itSource1.getCoLocationGroup(), itSource2.getCoLocationGroup()); } + @SuppressWarnings("rawtypes") @Test - public void test() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + public void testSimpleIteration() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); iterated = new boolean[PARALLELISM]; - env = constructIterativeJob(env); + DataStream<Boolean> source = env + .fromCollection(Collections.nCopies(PARALLELISM * 2, false)); + + IterativeDataStream<Boolean> iteration = source.iterate(3000); + + DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap); + + iteration.map(NoOpBoolMap).addSink(new NoOpSink()); + + iteration.closeWith(increment).addSink(new NoOpSink()); env.execute(); @@ -195,55 +338,135 @@ public class IterateTest { } } - + @Test public void testCoIteration() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE); - - - ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000).withFeedbackType("String"); - - try{ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000) + .withFeedbackType("String"); + + try { coIt.groupBy(1, 2); fail(); - } catch (UnsupportedOperationException e){} - - DataStream<String> head = coIt.flatMap(new CoFlatMapFunction<Integer, String, String>() { + } catch (UnsupportedOperationException e) { + } + + DataStream<String> head = coIt + .flatMap(new RichCoFlatMapFunction<Integer, String, String>() { + + private static final long serialVersionUID = 1L; + boolean seenFromSource = false; + + @Override + public void flatMap1(Integer value, Collector<String> out) throws Exception { + out.collect(((Integer) (value + 1)).toString()); + } + + @Override + public void flatMap2(String value, Collector<String> out) throws Exception { + Integer intVal = Integer.valueOf(value); + if (intVal < 2) { + out.collect(((Integer) (intVal + 1)).toString()); + } + if (intVal == 1000 || intVal == 2000) { + seenFromSource = true; + } + } - private static final long serialVersionUID = 1L; + @Override + public void close() { + assertTrue(seenFromSource); + } + }); + + coIt.map(new CoMapFunction<Integer, String, String>() { @Override - public void flatMap1(Integer value, Collector<String> out) throws Exception { - out.collect(((Integer) (value + 1)).toString()); + public String map1(Integer value) throws Exception { + return value.toString(); } @Override - public void flatMap2(String value, Collector<String> out) throws Exception { - Integer intVal = Integer.valueOf(value); - if(intVal < 2){ - out.collect(((Integer) (intVal + 1)).toString()); - } - + public String map2(String value) throws Exception { + return value; } - }); - - coIt.closeWith(head.broadcast()); - + }).setParallelism(1).addSink(new NoOpSink<String>()); + + coIt.closeWith(head.broadcast().union(env.fromElements("1000", "2000").rebalance())); + head.addSink(new TestSink()).setParallelism(1); - + env.execute(); - - assertEquals(new HashSet<String>(Arrays.asList("1","1","2","2","2","2")), TestSink.collected); + + Collections.sort(TestSink.collected); + assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected); + assertEquals(2, new ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0) + .getSourceSinkPairs().size()); } @Test + public void testGroupByFeedback() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() { + + @Override + public Integer getKey(Integer value) throws Exception { + return value % 3; + } + }; + + DataStream<Integer> source = env.fromElements(1, 2, 3); + + IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000); + + DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() { + + int received = 0; + int key = -1; + + @Override + public void flatMap(Integer value, Collector<Integer> out) throws Exception { + received++; + if (key == -1) { + key = value % 3; + } else { + assertEquals(key, value % 3); + } + if (value > 0) { + out.collect(value - 1); + } + } + + @Override + public void close() { + assertTrue(received > 1); + } + }); + + it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)), + true).addSink(new NoOpSink<Integer>()); + + env.execute(); + } + + @SuppressWarnings("deprecation") + @Test public void testWithCheckPointing() throws Exception { StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE); + env.enableCheckpointing(); - env = constructIterativeJob(env); + DataStream<Boolean> source = env + .fromCollection(Collections.nCopies(PARALLELISM * 2, false)); + + IterativeDataStream<Boolean> iteration = source.iterate(3000); + + iteration.closeWith(iteration.flatMap(new IterationHead())); - env.enableCheckpointing(); try { env.execute(); @@ -252,8 +475,7 @@ public class IterateTest { } catch (UnsupportedOperationException e) { // expected behaviour } - - + // Test force checkpointing try { @@ -265,22 +487,75 @@ public class IterateTest { } catch (UnsupportedOperationException e) { // expected behaviour } - + env.enableCheckpointing(1, true); env.getStreamGraph().getJobGraph(); + } + + public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> { + public void flatMap(Boolean value, Collector<Boolean> out) throws Exception { + int indx = getRuntimeContext().getIndexOfThisSubtask(); + if (value) { + iterated[indx] = true; + } else { + out.collect(true); + } + } + } + + public static final class NoOpSink<T> extends RichSinkFunction<T> { + private List<T> received; + + public void invoke(T tuple) { + received.add(tuple); + } + + public void open(Configuration conf) { + received = new ArrayList<T>(); + } + public void close() { + assertTrue(received.size() > 0); + } } - - public static class TestSink implements SinkFunction<String>{ + + public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() { + + public String map1(Integer value) throws Exception { + return value.toString(); + } + + public String map2(String value) throws Exception { + return value; + } + }; + + public static MapFunction<Integer, Integer> NoOpIntMap = new MapFunction<Integer, Integer>() { + + public Integer map(Integer value) throws Exception { + return value; + } + + }; + + public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() { + + public Boolean map(Boolean value) throws Exception { + return value; + } + + }; + + public static class TestSink implements SinkFunction<String> { private static final long serialVersionUID = 1L; - public static Set<String> collected = new HashSet<String>(); - + public static List<String> collected = new ArrayList<String>(); + @Override public void invoke(String value) throws Exception { collected.add(value); } - + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index fbd6502..2b0f60e 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -78,6 +78,10 @@ class DataStream[T](javaStream: JavaStream[T]) { * Returns the parallelism of this operation. */ def getParallelism = javaStream.getParallelism + + def getPartitioner = javaStream.getPartitioner + + def getSelectedNames = javaStream.getSelectedNames /** * Returns the execution config.
