Repository: flink
Updated Branches:
  refs/heads/master 4b53e66c4 -> ff750e61a


[streaming] Discretizer reuse bugfixes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff750e61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff750e61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff750e61

Branch: refs/heads/master
Commit: ff750e61a556a96f0cc55997706a0fac7f764dca
Parents: 4b53e66
Author: Gyula Fora <gyf...@apache.org>
Authored: Tue May 12 20:08:53 2015 +0200
Committer: Gyula Fora <gyf...@apache.org>
Committed: Tue May 12 20:08:53 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/graph/WindowingOptimizer.java | 74 ++++++++++++--------
 1 file changed, 44 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff750e61/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index dfcdc8d..a0dcdf7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -82,48 +83,61 @@ public class WindowingOptimizer {
 
        private static void setDiscretizerReuse(StreamGraph streamGraph) {
 
-               Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = 
streamGraph.getOperators();
-               List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new 
ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>();
+               Collection<StreamNode> nodes = streamGraph.getStreamNodes();
+               List<StreamNode> discretizers = new ArrayList<StreamNode>();
 
-               // Get the discretizers
-               for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) {
-                       if (entry.f1 instanceof StreamDiscretizer) {
-                               discretizers.add(new Tuple2<Integer, 
StreamDiscretizer<?>>(entry.f0,
-                                               (StreamDiscretizer<?>) 
entry.f1));
+               for (StreamNode node : nodes) {
+                       if (node.getOperator() instanceof StreamDiscretizer) {
+                               discretizers.add(node);
                        }
                }
 
-               List<Tuple2<StreamDiscretizer<?>, List<Integer>>> 
matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, 
List<Integer>>>();
+               List<Tuple2<StreamDiscretizer<?>, List<StreamNode>>> 
matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, 
List<StreamNode>>>();
+
+               for (StreamNode discretizer : discretizers) {
+                       boolean matchedAny = false;
+                       for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> 
candidate : matchingDiscretizers) {
+
+                               Set<Integer> discretizerInEdges = new 
HashSet<Integer>(
+                                               discretizer.getInEdgeIndices());
+                               Set<Integer> toMatchInEdges = new 
HashSet<Integer>(candidate.f1.get(0)
+                                               .getInEdgeIndices());
+
+                               boolean partitionersMatch = true;
+
+                               for (StreamEdge edge1 : 
discretizer.getInEdges()) {
+                                       for (StreamEdge edge2 : 
candidate.f1.get(0).getInEdges()) {
+                                               if 
(edge1.getPartitioner().getStrategy() != edge2.getPartitioner()
+                                                               .getStrategy()) 
{
+                                                       partitionersMatch = 
false;
+                                               }
+                                       }
+                               }
 
-               for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : 
discretizers) {
-                       boolean inMatching = false;
-                       for (Tuple2<StreamDiscretizer<?>, List<Integer>> 
matching : matchingDiscretizers) {
-                               Set<Integer> discretizerInEdges = new 
HashSet<Integer>(streamGraph.getStreamNode(
-                                               
discretizer.f0).getInEdgeIndices());
-                               Set<Integer> matchingInEdges = new 
HashSet<Integer>(streamGraph.getStreamNode(
-                                               
matching.f1.get(0)).getInEdgeIndices());
+                               if (partitionersMatch
+                                               && discretizer.getParallelism() 
== candidate.f1.get(0).getParallelism()
+                                               && 
discretizer.getOperator().equals(candidate.f0)
+                                               && 
discretizerInEdges.equals(toMatchInEdges)) {
 
-                               if (discretizer.f1.equals(matching.f0)
-                                               && 
discretizerInEdges.equals(matchingInEdges)) {
-                                       matching.f1.add(discretizer.f0);
-                                       inMatching = true;
+                                       candidate.f1.add(discretizer);
+                                       matchedAny = true;
                                        break;
                                }
                        }
-                       if (!inMatching) {
-                               List<Integer> matchingNames = new 
ArrayList<Integer>();
-                               matchingNames.add(discretizer.f0);
-                               matchingDiscretizers.add(new 
Tuple2<StreamDiscretizer<?>, List<Integer>>(
-                                               discretizer.f1, matchingNames));
+                       if (!matchedAny) {
+                               List<StreamNode> matchingNodes = new 
ArrayList<StreamNode>();
+                               matchingNodes.add(discretizer);
+                               matchingDiscretizers.add(new 
Tuple2<StreamDiscretizer<?>, List<StreamNode>>(
+                                               (StreamDiscretizer<?>) 
discretizer.getOperator(), matchingNodes));
                        }
                }
 
-               for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : 
matchingDiscretizers) {
-                       List<Integer> matchList = matching.f1;
+               for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> matching : 
matchingDiscretizers) {
+                       List<StreamNode> matchList = matching.f1;
                        if (matchList.size() > 1) {
-                               Integer first = matchList.get(0);
+                               StreamNode first = matchList.get(0);
                                for (int i = 1; i < matchList.size(); i++) {
-                                       replaceDiscretizer(streamGraph, 
matchList.get(i), first);
+                                       replaceDiscretizer(streamGraph, 
matchList.get(i).getID(), first.getID());
                                }
                        }
                }
@@ -132,8 +146,8 @@ public class WindowingOptimizer {
        private static void replaceDiscretizer(StreamGraph streamGraph, Integer 
toReplaceID,
                        Integer replaceWithID) {
                // Convert to array to create a copy
-               List<StreamEdge> outEdges = new 
ArrayList<StreamEdge>(streamGraph.getStreamNode(toReplaceID)
-                               .getOutEdges());
+               List<StreamEdge> outEdges = new 
ArrayList<StreamEdge>(streamGraph
+                               .getStreamNode(toReplaceID).getOutEdges());
 
                int numOutputs = outEdges.size();
 

Reply via email to