Repository: samza Updated Branches: refs/heads/master 5a673604a -> 9d3a68794
SAMZA-946 - ConcurrentModificationException in TaskAssignmentManager when partition count changes. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d3a6879 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d3a6879 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d3a6879 Branch: refs/heads/master Commit: 9d3a687948d8e16550a91d8a560c88a3ac5e9e50 Parents: 5a67360 Author: Jacob Maes <[email protected]> Authored: Fri May 6 12:32:57 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Fri May 6 12:32:57 2016 -0700 ---------------------------------------------------------------------- .../samza/config/DefaultChooserConfig.java | 2 +- .../grouper/task/TaskAssignmentManager.java | 21 +++------- .../grouper/task/TestTaskAssignmentManager.java | 40 ++++++++++++++++++++ 3 files changed, 46 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java index d242d14..237c6f9 100644 --- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java @@ -64,7 +64,7 @@ public class DefaultChooserConfig extends MapConfig { /** * Gets the priority of every SystemStream for which the priority - * was explicitly configured with a value >=0. + * was explicitly configured with a value >=0. * * @return the explicitly-configured stream priorities as a map from * SystemStream to the configured priority value. Streams that http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index ec5cf3d..0cbdec8 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; * */ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager { private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class); - private Map<String, Integer> taskNameToContainerId = new HashMap<>(); + private final Map<String, Integer> taskNameToContainerId = new HashMap<>(); /** * Default constructor that creates a read-write manager @@ -51,16 +51,6 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager { super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager"); } - /** - * Special constructor that creates a write-only {@link TaskAssignmentManager} that only writes - * to coordinator stream in SamzaContainer - * - * @param coordinatorStreamSystemProducer producer to the coordinator stream - */ - public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) { - this(coordinatorStreamSystemProducer, null); - } - @Override public void register(TaskName taskName) { // taskName will not be used. This producer is global scope. @@ -75,24 +65,23 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager { * @return the map of taskName: containerId */ public Map<String, Integer> readTaskAssignment() { - Map<String, Integer> allMappings = new HashMap<>(); + taskNameToContainerId.clear(); for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) { if (message.isDelete()) { - allMappings.remove(message.getKey()); + taskNameToContainerId.remove(message.getKey()); log.debug("Got TaskContainerMapping delete message: {}", message); } else { SetTaskContainerMapping mapping = new SetTaskContainerMapping(message); - allMappings.put(mapping.getKey(), mapping.getTaskAssignment()); + taskNameToContainerId.put(mapping.getKey(), mapping.getTaskAssignment()); log.debug("Got TaskContainerMapping message: {}", mapping); } } - taskNameToContainerId = allMappings; for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) { log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue()); } - return Collections.unmodifiableMap(allMappings); + return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId)); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java index 7f83494..19ab78e 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java @@ -96,6 +96,46 @@ public class TestTaskAssignmentManager { assertTrue(consumer.isStopped()); } + @Test public void testDeleteMappings() throws Exception { + MockCoordinatorStreamSystemProducer producer = + mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null); + MockCoordinatorStreamSystemConsumer consumer = + mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null); + TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer); + + taskAssignmentManager.register(new TaskName("ignoredTaskName")); + assertTrue(producer.isRegistered()); + assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager"); + assertTrue(consumer.isRegistered()); + + taskAssignmentManager.start(); + assertTrue(producer.isStarted()); + assertTrue(consumer.isStarted()); + + Map<String, Integer> expectedMap = + new HashMap<String, Integer>() { + { + this.put("Task0", new Integer(0)); + this.put("Task1", new Integer(1)); + } + }; + + for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) { + taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue()); + } + + Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment(); + assertEquals(expectedMap, localMap); + + taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet()); + Map<String, Integer> deletedMap = taskAssignmentManager.readTaskAssignment(); + assertTrue(deletedMap.isEmpty()); + + taskAssignmentManager.stop(); + assertTrue(producer.isStopped()); + assertTrue(consumer.isStopped()); + } + @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception { MockCoordinatorStreamSystemProducer producer = mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
