Repository: samza Updated Branches: refs/heads/master 26dc77b08 -> e11ccd241
SAMZA-1347: GroupByContainerIds NPE if containerIds list is null Author: Jacob Maes <[email protected]> Reviewers: Boris Shkolnik <[email protected]> Closes #233 from jmakes/samza-1347 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e11ccd24 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e11ccd24 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e11ccd24 Branch: refs/heads/master Commit: e11ccd241c6dddd1ffb25e5debd4a639930a4660 Parents: 26dc77b Author: Jacob Maes <[email protected]> Authored: Tue Jun 27 11:42:51 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Tue Jun 27 11:42:51 2017 -0700 ---------------------------------------------------------------------- .../grouper/task/GroupByContainerIds.java | 15 ++++----- .../grouper/task/TestGroupByContainerIds.java | 35 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e11ccd24/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java index f2d88cd..651dca7 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -46,12 +46,6 @@ public class GroupByContainerIds implements TaskNameGrouper { @Override public Set<ContainerModel> group(Set<TaskModel> tasks) { - if (tasks.isEmpty()) - throw new IllegalArgumentException("cannot group an empty set"); - - if (startContainerCount > tasks.size()) - throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size()); - List<String> containerIds = new ArrayList<>(startContainerCount); for (int i = 0; i < startContainerCount; i++) { containerIds.add(String.valueOf(i)); @@ -60,6 +54,12 @@ public class GroupByContainerIds implements TaskNameGrouper { } public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) { + if (containersIds == null) + return this.group(tasks); + + if (containersIds.isEmpty()) + throw new IllegalArgumentException("Must have at least one container"); + if (tasks.isEmpty()) throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays .toString(containersIds.toArray())); @@ -67,9 +67,6 @@ public class GroupByContainerIds implements TaskNameGrouper { if (containersIds.size() > tasks.size()) throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size()); - if (containersIds == null) - return this.group(tasks); - int containerCount = containersIds.size(); // Sort tasks by taskName. http://git-wip-us.apache.org/repos/asf/samza/blob/e11ccd24/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java index 62131fe..cd2cc3d 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java @@ -20,6 +20,7 @@ package org.apache.samza.container.grouper.task; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -114,6 +115,40 @@ public class TestGroupByContainerIds { } @Test + public void testGroupWithNullContainerIds() { + Set<TaskModel> taskModels = generateTaskModels(5); + + Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels, null); + + Map<String, ContainerModel> containersMap = new HashMap<>(); + for (ContainerModel container : containers) { + containersMap.put(container.getProcessorId(), container); + } + + assertEquals(2, containers.size()); + ContainerModel container0 = containersMap.get("0"); + ContainerModel container1 = containersMap.get("1"); + assertNotNull(container0); + assertNotNull(container1); + assertEquals("0", container0.getProcessorId()); + assertEquals("1", container1.getProcessorId()); + assertEquals(3, container0.getTasks().size()); + assertEquals(2, container1.getTasks().size()); + assertTrue(container0.getTasks().containsKey(getTaskName(0))); + assertTrue(container0.getTasks().containsKey(getTaskName(2))); + assertTrue(container0.getTasks().containsKey(getTaskName(4))); + assertTrue(container1.getTasks().containsKey(getTaskName(1))); + assertTrue(container1.getTasks().containsKey(getTaskName(3))); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupWithEmptyContainerIds() { + Set<TaskModel> taskModels = generateTaskModels(5); + + buildSimpleGrouper(2).group(taskModels, Collections.emptyList()); + } + + @Test public void testGroupHappyPathWithListOfContainers() { Set<TaskModel> taskModels = generateTaskModels(5);
