Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 d39bce9cb -> dde754246


Enhancements to execution planner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dde75424
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dde75424
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dde75424

Branch: refs/heads/samza-fluent-api-v1
Commit: dde754246391729e00a19a15058525cdeb2fca1a
Parents: 93c82f3
Author: Xinyu Liu <[email protected]>
Authored: Mon Feb 27 12:25:23 2017 -0800
Committer: Xinyu Liu <[email protected]>
Committed: Mon Feb 27 12:29:45 2017 -0800

----------------------------------------------------------------------
 .../samza/processorgraph/ExecutionPlanner.java  | 34 ++++++++++++--------
 1 file changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dde75424/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
 
b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index a990463..055f87c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -59,7 +59,7 @@ public class ExecutionPlanner {
     ProcessorGraph processorGraph = splitStages(streamGraph);
 
     // figure out the partition for internal streams
-    Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, 
sysAdmins);
+    Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, 
processorGraph, sysAdmins);
 
     // create the streams
     createStreams(streams, sysAdmins);
@@ -96,9 +96,12 @@ public class ExecutionPlanner {
     return processorGraph;
   }
 
-  private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph 
processorGraph, Map<String, SystemAdmin> sysAdmins) {
+  private Multimap<String, StreamSpec> calculatePartitions(StreamGraph 
streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) 
{
     // fetch the external streams partition info
-    getExternalStreamPartitions(processorGraph, sysAdmins);
+    getExistingStreamPartitions(processorGraph, sysAdmins);
+
+    // use BFS to figure out the join partition count
+
 
     // TODO this algorithm assumes only one processor, and it does not 
consider join
     Multimap<String, StreamSpec> streamsGroupedBySystem = 
HashMultimap.create();
@@ -112,9 +115,11 @@ public class ExecutionPlanner {
           int partition = Math.max(maxInPartition, maxOutPartition);
 
           outStreams.forEach(streamEdge -> {
-              streamEdge.setPartitions(partition);
-              StreamSpec streamSpec = createStreamSpec(streamEdge);
-              
streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), 
streamSpec);
+              if (streamEdge.getPartitions() == -1) {
+                streamEdge.setPartitions(partition);
+                StreamSpec streamSpec = createStreamSpec(streamEdge);
+                
streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), 
streamSpec);
+              }
             });
         }
       });
@@ -122,16 +127,17 @@ public class ExecutionPlanner {
     return streamsGroupedBySystem;
   }
 
-  private void getExternalStreamPartitions(ProcessorGraph processorGraph, 
Map<String, SystemAdmin> sysAdmins) {
-    Set<StreamEdge> externalStreams = new HashSet<>();
-    externalStreams.addAll(processorGraph.getSources());
-    externalStreams.addAll(processorGraph.getSinks());
+  private void getExistingStreamPartitions(ProcessorGraph processorGraph, 
Map<String, SystemAdmin> sysAdmins) {
+    Set<StreamEdge> allStreams = new HashSet<>();
+    allStreams.addAll(processorGraph.getSources());
+    allStreams.addAll(processorGraph.getSinks());
+    allStreams.addAll(processorGraph.getInternalStreams());
 
     Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
-    externalStreams.forEach(streamEdge -> {
-        SystemStream systemStream = streamEdge.getSystemStream();
-        externalStreamsMap.put(systemStream.getSystem(), streamEdge);
-      });
+    allStreams.forEach(streamEdge -> {
+      SystemStream systemStream = streamEdge.getSystemStream();
+      externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+    });
     for (Map.Entry<String, Collection<StreamEdge>> entry : 
externalStreamsMap.asMap().entrySet()) {
       String systemName = entry.getKey();
       Collection<StreamEdge> streamEdges = entry.getValue();

Reply via email to