Repository: flink Updated Branches: refs/heads/master 4b53e66c4 -> ff750e61a
[streaming] Discretizer reuse bugfixes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff750e61 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff750e61 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff750e61 Branch: refs/heads/master Commit: ff750e61a556a96f0cc55997706a0fac7f764dca Parents: 4b53e66 Author: Gyula Fora <gyf...@apache.org> Authored: Tue May 12 20:08:53 2015 +0200 Committer: Gyula Fora <gyf...@apache.org> Committed: Tue May 12 20:08:53 2015 +0200 ---------------------------------------------------------------------- .../streaming/api/graph/WindowingOptimizer.java | 74 ++++++++++++-------- 1 file changed, 44 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff750e61/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java index dfcdc8d..a0dcdf7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.graph; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -82,48 +83,61 @@ public class WindowingOptimizer { private static void setDiscretizerReuse(StreamGraph streamGraph) { - Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = streamGraph.getOperators(); - List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>(); + Collection<StreamNode> nodes = streamGraph.getStreamNodes(); + List<StreamNode> discretizers = new ArrayList<StreamNode>(); - // Get the discretizers - for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) { - if (entry.f1 instanceof StreamDiscretizer) { - discretizers.add(new Tuple2<Integer, StreamDiscretizer<?>>(entry.f0, - (StreamDiscretizer<?>) entry.f1)); + for (StreamNode node : nodes) { + if (node.getOperator() instanceof StreamDiscretizer) { + discretizers.add(node); } } - List<Tuple2<StreamDiscretizer<?>, List<Integer>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<Integer>>>(); + List<Tuple2<StreamDiscretizer<?>, List<StreamNode>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<StreamNode>>>(); + + for (StreamNode discretizer : discretizers) { + boolean matchedAny = false; + for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> candidate : matchingDiscretizers) { + + Set<Integer> discretizerInEdges = new HashSet<Integer>( + discretizer.getInEdgeIndices()); + Set<Integer> toMatchInEdges = new HashSet<Integer>(candidate.f1.get(0) + .getInEdgeIndices()); + + boolean partitionersMatch = true; + + for (StreamEdge edge1 : discretizer.getInEdges()) { + for (StreamEdge edge2 : candidate.f1.get(0).getInEdges()) { + if (edge1.getPartitioner().getStrategy() != edge2.getPartitioner() + .getStrategy()) { + partitionersMatch = false; + } + } + } - for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) { - boolean inMatching = false; - for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) { - Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getStreamNode( - discretizer.f0).getInEdgeIndices()); - Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getStreamNode( - matching.f1.get(0)).getInEdgeIndices()); + if (partitionersMatch + && discretizer.getParallelism() == candidate.f1.get(0).getParallelism() + && discretizer.getOperator().equals(candidate.f0) + && discretizerInEdges.equals(toMatchInEdges)) { - if (discretizer.f1.equals(matching.f0) - && discretizerInEdges.equals(matchingInEdges)) { - matching.f1.add(discretizer.f0); - inMatching = true; + candidate.f1.add(discretizer); + matchedAny = true; break; } } - if (!inMatching) { - List<Integer> matchingNames = new ArrayList<Integer>(); - matchingNames.add(discretizer.f0); - matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<Integer>>( - discretizer.f1, matchingNames)); + if (!matchedAny) { + List<StreamNode> matchingNodes = new ArrayList<StreamNode>(); + matchingNodes.add(discretizer); + matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<StreamNode>>( + (StreamDiscretizer<?>) discretizer.getOperator(), matchingNodes)); } } - for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) { - List<Integer> matchList = matching.f1; + for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> matching : matchingDiscretizers) { + List<StreamNode> matchList = matching.f1; if (matchList.size() > 1) { - Integer first = matchList.get(0); + StreamNode first = matchList.get(0); for (int i = 1; i < matchList.size(); i++) { - replaceDiscretizer(streamGraph, matchList.get(i), first); + replaceDiscretizer(streamGraph, matchList.get(i).getID(), first.getID()); } } } @@ -132,8 +146,8 @@ public class WindowingOptimizer { private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID, Integer replaceWithID) { // Convert to array to create a copy - List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getStreamNode(toReplaceID) - .getOutEdges()); + List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph + .getStreamNode(toReplaceID).getOutEdges()); int numOutputs = outEdges.size();