Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 d39bce9cb -> dde754246
Enhancements to execution planner Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dde75424 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dde75424 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dde75424 Branch: refs/heads/samza-fluent-api-v1 Commit: dde754246391729e00a19a15058525cdeb2fca1a Parents: 93c82f3 Author: Xinyu Liu <[email protected]> Authored: Mon Feb 27 12:25:23 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Mon Feb 27 12:29:45 2017 -0800 ---------------------------------------------------------------------- .../samza/processorgraph/ExecutionPlanner.java | 34 ++++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/dde75424/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java index a990463..055f87c 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java @@ -59,7 +59,7 @@ public class ExecutionPlanner { ProcessorGraph processorGraph = splitStages(streamGraph); // figure out the partition for internal streams - Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, sysAdmins); + Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins); // create the streams createStreams(streams, sysAdmins); @@ -96,9 +96,12 @@ public class ExecutionPlanner { return processorGraph; } - private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { + private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { // fetch the external streams partition info - getExternalStreamPartitions(processorGraph, sysAdmins); + getExistingStreamPartitions(processorGraph, sysAdmins); + + // use BFS to figure out the join partition count + // TODO this algorithm assumes only one processor, and it does not consider join Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create(); @@ -112,9 +115,11 @@ public class ExecutionPlanner { int partition = Math.max(maxInPartition, maxOutPartition); outStreams.forEach(streamEdge -> { - streamEdge.setPartitions(partition); - StreamSpec streamSpec = createStreamSpec(streamEdge); - streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec); + if (streamEdge.getPartitions() == -1) { + streamEdge.setPartitions(partition); + StreamSpec streamSpec = createStreamSpec(streamEdge); + streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec); + } }); } }); @@ -122,16 +127,17 @@ public class ExecutionPlanner { return streamsGroupedBySystem; } - private void getExternalStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { - Set<StreamEdge> externalStreams = new HashSet<>(); - externalStreams.addAll(processorGraph.getSources()); - externalStreams.addAll(processorGraph.getSinks()); + private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { + Set<StreamEdge> allStreams = new HashSet<>(); + allStreams.addAll(processorGraph.getSources()); + allStreams.addAll(processorGraph.getSinks()); + allStreams.addAll(processorGraph.getInternalStreams()); Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create(); - externalStreams.forEach(streamEdge -> { - SystemStream systemStream = streamEdge.getSystemStream(); - externalStreamsMap.put(systemStream.getSystem(), streamEdge); - }); + allStreams.forEach(streamEdge -> { + SystemStream systemStream = streamEdge.getSystemStream(); + externalStreamsMap.put(systemStream.getSystem(), streamEdge); + }); for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) { String systemName = entry.getKey(); Collection<StreamEdge> streamEdges = entry.getValue();
