Repository: samza Updated Branches: refs/heads/master 7b07df321 -> 4ea6b7e34
SAMZA-1007 - Broadcast streams incompatible with GroupBySystemStreamPartitionFactory Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ea6b7e3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ea6b7e3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ea6b7e3 Branch: refs/heads/master Commit: 4ea6b7e3421772b291342d6fa48a0feebb75d5fc Parents: 7b07df3 Author: Neil Fordyce <[email protected]> Authored: Sat Sep 10 17:30:23 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Sat Sep 10 17:30:23 2016 -0700 ---------------------------------------------------------------------- .../grouper/stream/GroupBySystemStreamPartition.java | 6 ------ .../grouper/stream/GroupBySystemStreamPartitionFactory.java | 2 +- .../grouper/stream/TestGroupBySystemStreamPartition.java | 8 ++++++-- 3 files changed, 7 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java index a8b41de..af96523 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java @@ -34,12 +34,6 @@ public class GroupBySystemStreamPartition implements SystemStreamPartitionGroupe private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>(); /** - * default constructor - */ - public GroupBySystemStreamPartition() { - } - - /** * A constructor that accepts job config as the parameter * * @param config job config http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java index 04a7444..54c2d27 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java @@ -25,7 +25,7 @@ public class GroupBySystemStreamPartitionFactory implements SystemStreamPartitio @Override public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) { - return new GroupBySystemStreamPartition(); + return new GroupBySystemStreamPartition(config); } } http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java index 1bd14a4..2bf6cee 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java @@ -39,12 +39,15 @@ public class TestGroupBySystemStreamPartition { SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1)); SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2)); SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0)); + GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory(); @Test public void testLocalStreamGroupedCorrectly() { HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); + HashMap<String, String> configMap = new HashMap<String, String>(); + Config config = new MapConfig(configMap); - GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(); + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs); assertTrue(emptyResult.isEmpty()); @@ -79,7 +82,8 @@ public class TestGroupBySystemStreamPartition { HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>(); Collections.addAll(allSSPs, aa0, aa1, aa2, ac0); - GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(config); + GroupBySystemStreamPartitionFactory grouperFactory = new GroupBySystemStreamPartitionFactory(); + SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config); Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs); Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
