Repository: samza
Updated Branches:
  refs/heads/master 7b07df321 -> 4ea6b7e34


SAMZA-1007 - Broadcast streams incompatible with 
GroupBySystemStreamPartitionFactory


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ea6b7e3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ea6b7e3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ea6b7e3

Branch: refs/heads/master
Commit: 4ea6b7e3421772b291342d6fa48a0feebb75d5fc
Parents: 7b07df3
Author: Neil Fordyce <[email protected]>
Authored: Sat Sep 10 17:30:23 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Sat Sep 10 17:30:23 2016 -0700

----------------------------------------------------------------------
 .../grouper/stream/GroupBySystemStreamPartition.java         | 6 ------
 .../grouper/stream/GroupBySystemStreamPartitionFactory.java  | 2 +-
 .../grouper/stream/TestGroupBySystemStreamPartition.java     | 8 ++++++--
 3 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index a8b41de..af96523 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -34,12 +34,6 @@ public class GroupBySystemStreamPartition implements 
SystemStreamPartitionGroupe
   private Set<SystemStreamPartition> broadcastStreams = new 
HashSet<SystemStreamPartition>();
 
   /**
-   * default constructor
-   */
-  public GroupBySystemStreamPartition() {
-  }
-
-  /**
    * A constructor that accepts job config as the parameter
    *
    * @param config job config

http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
index 04a7444..54c2d27 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
@@ -25,7 +25,7 @@ public class GroupBySystemStreamPartitionFactory implements 
SystemStreamPartitio
 
   @Override
   public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
-    return new GroupBySystemStreamPartition();
+    return new GroupBySystemStreamPartition(config);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4ea6b7e3/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
index 1bd14a4..2bf6cee 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -39,12 +39,15 @@ public class TestGroupBySystemStreamPartition {
   SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", 
new Partition(1));
   SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", 
new Partition(2));
   SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", 
new Partition(0));
+  GroupBySystemStreamPartitionFactory grouperFactory = new 
GroupBySystemStreamPartitionFactory();
 
   @Test
   public void testLocalStreamGroupedCorrectly() {
     HashSet<SystemStreamPartition> allSSPs = new 
HashSet<SystemStreamPartition>();
+    HashMap<String, String> configMap = new HashMap<String, String>();
+    Config config = new MapConfig(configMap);
 
-    GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition();
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
     Map<TaskName, Set<SystemStreamPartition>> emptyResult = 
grouper.group(allSSPs);
     assertTrue(emptyResult.isEmpty());
 
@@ -79,7 +82,8 @@ public class TestGroupBySystemStreamPartition {
 
     HashSet<SystemStreamPartition> allSSPs = new 
HashSet<SystemStreamPartition>();
     Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
-    GroupBySystemStreamPartition grouper = new 
GroupBySystemStreamPartition(config);
+    GroupBySystemStreamPartitionFactory grouperFactory = new 
GroupBySystemStreamPartitionFactory();
+    SystemStreamPartitionGrouper grouper = 
grouperFactory.getSystemStreamPartitionGrouper(config);
     Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
 
     Map<TaskName, Set<SystemStreamPartition>> expectedResult = new 
HashMap<TaskName, Set<SystemStreamPartition>>();

Reply via email to