SAMZA-1879: Remove deprecated containerId from ContainerModel Author: Cameron Lee <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>, Shanthoosh Venkatraman <[email protected]> Closes #639 from cameronlee314/remove_container_id Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3bb24c8e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3bb24c8e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3bb24c8e Branch: refs/heads/NewKafkaSystemConsumer Commit: 3bb24c8eec40099dbc177df7c77f2793d7d70653 Parents: 160927a Author: Cameron Lee <[email protected]> Authored: Wed Sep 19 12:18:46 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Sep 19 12:18:46 2018 -0700 ---------------------------------------------------------------------- .../grouper/task/GroupByContainerCount.java | 6 +- .../grouper/task/GroupByContainerIds.java | 4 +- .../task/SingleContainerGrouperFactory.java | 2 +- .../apache/samza/job/model/ContainerModel.java | 31 +-- .../org/apache/samza/job/model/JobModel.java | 2 - .../model/JsonContainerModelMixIn.java | 38 ++-- .../serializers/model/JsonJobModelMixIn.java | 2 + .../serializers/model/JsonTaskModelMixIn.java | 2 + .../serializers/model/SamzaObjectMapper.java | 41 ++-- .../apache/samza/storage/StorageRecovery.java | 2 +- .../samza/coordinator/JobModelManager.scala | 2 +- .../grouper/task/TestGroupByContainerCount.java | 89 ++++---- .../grouper/task/TestGroupByContainerIds.java | 37 ++-- .../samza/container/mock/ContainerMocks.java | 6 +- .../coordinator/JobModelManagerTestUtil.java | 13 +- .../apache/samza/job/model/TestJobModel.java | 4 +- .../operators/impl/TestOperatorImplGraph.java | 8 +- .../model/TestSamzaObjectMapper.java | 205 +++++++++++++++---- .../samza/container/TestSamzaContainer.scala | 12 +- .../samza/coordinator/TestJobCoordinator.scala | 12 +- .../webapp/TestApplicationMasterRestClient.java | 2 +- 21 files changed, 310 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java index 74c69d6..b4d6c90 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java @@ -78,7 +78,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { // Convert to a Set of ContainerModel Set<ContainerModel> containerModels = new HashSet<>(); for (int i = 0; i < containerCount; i++) { - containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i])); + containerModels.add(new ContainerModel(String.valueOf(i), taskGroups[i])); } return Collections.unmodifiableSet(containerModels); @@ -194,7 +194,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) { for (ContainerModel container : containers) { for (TaskName taskName : container.getTasks().keySet()) { - taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId()); + taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getId()); } } } @@ -301,7 +301,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { containerTaskModels.put(model.getTaskName(), model); } containerModels.add( - new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels)); + new ContainerModel(container.containerId, containerTaskModels)); } return Collections.unmodifiableSet(containerModels); } http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java index f5a5a86..9dab943 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -100,9 +100,7 @@ public class GroupByContainerIds implements TaskNameGrouper { // Convert to a Set of ContainerModel Set<ContainerModel> containerModels = new HashSet<>(); for (int i = 0; i < containerCount; i++) { - // containerId in ContainerModel constructor is set to -1 because processorId can be any string and does - // not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed. - containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i])); + containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i])); } return Collections.unmodifiableSet(containerModels); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java index 15cd224..ee962d5 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java @@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper { for (TaskModel taskModel: taskModels) { taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel); } - ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap); + ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap); return Collections.singleton(containerModel); } } http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java index bd4fa94..980806b 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java @@ -19,10 +19,9 @@ package org.apache.samza.job.model; -import org.apache.samza.container.TaskName; - import java.util.Collections; import java.util.Map; +import org.apache.samza.container.TaskName; /** * <p> @@ -41,28 +40,16 @@ import java.util.Map; * </p> */ public class ContainerModel { - @Deprecated - private final int containerId; - private final String processorId; + private final String id; private final Map<TaskName, TaskModel> tasks; - public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) { - this.containerId = containerId; - if (processorId == null) { - this.processorId = String.valueOf(containerId); - } else { - this.processorId = processorId; - } + public ContainerModel(String id, Map<TaskName, TaskModel> tasks) { + this.id = id; this.tasks = Collections.unmodifiableMap(tasks); } - @Deprecated - public int getContainerId() { - return containerId; - } - - public String getProcessorId() { - return processorId; + public String getId() { + return id; } public Map<TaskName, TaskModel> getTasks() { @@ -71,14 +58,14 @@ public class ContainerModel { @Override public String toString() { - return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]"; + return "ContainerModel [id=" + id + ", tasks=" + tasks + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((processorId == null) ? 0 : processorId.hashCode()); + result = prime * result + ((id == null) ? 0 : id.hashCode()); result = prime * result + ((tasks == null) ? 0 : tasks.hashCode()); return result; } @@ -92,7 +79,7 @@ public class ContainerModel { if (getClass() != obj.getClass()) return false; ContainerModel other = (ContainerModel) obj; - if (!processorId.equals(other.processorId)) + if (!id.equals(other.id)) return false; if (tasks == null) { if (other.tasks != null) http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java index d2f8fda..be26f10 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.samza.config.Config; import org.apache.samza.container.LocalityManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; /** * <p> @@ -39,7 +38,6 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties; * an id, partition information, etc. * </p> */ -@JsonIgnoreProperties(ignoreUnknown = true) public class JobModel { private static final String EMPTY_STRING = ""; private final Config config; http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java index e19afec..f18c42a 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java @@ -19,29 +19,35 @@ package org.apache.samza.serializers.model; +import java.util.Map; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.TaskModel; -import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -import java.util.Map; - /** - * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON. + * A mix-in Jackson class to convert {@link org.apache.samza.job.model.ContainerModel} to JSON. + * Notes: + * 1) Constructor is not needed because this mixin is not used for deserialization. See {@link SamzaObjectMapper}. + * 2) It is unnecessary to use {@link org.codehaus.jackson.annotate.JsonIgnoreProperties#ignoreUnknown()} here since + * {@link SamzaObjectMapper} already uses custom deserialization code for the + * {@link org.apache.samza.job.model.ContainerModel}. + * 3) See {@link SamzaObjectMapper} for more context about why the JSON keys are named in this specified way. */ public abstract class JsonContainerModelMixIn { - @JsonCreator - public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) { - } - - @Deprecated - @JsonProperty("container-id") - abstract int getContainerId(); - - @JsonProperty("processor-id") - abstract String getProcessorId(); - - @JsonProperty("tasks") + /** + * This is intentionally not "id" for backwards compatibility reasons. See {@link SamzaObjectMapper} for more details. + */ + static final String PROCESSOR_ID_KEY = "processor-id"; + /** + * This is used for backwards compatibility. See {@link SamzaObjectMapper} for more details. + */ + static final String CONTAINER_ID_KEY = "container-id"; + static final String TASKS_KEY = "tasks"; + + @JsonProperty(PROCESSOR_ID_KEY) + abstract String getId(); + + @JsonProperty(TASKS_KEY) abstract Map<TaskName, TaskModel> getTasks(); } http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java index 4b0c404..c40173a 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java @@ -23,11 +23,13 @@ import java.util.Map; import org.apache.samza.config.Config; import org.apache.samza.job.model.ContainerModel; import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonProperty; /** * A mix-in Jackson class to convert Samza's JobModel to/from JSON. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class JsonJobModelMixIn { @JsonCreator public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<String, ContainerModel> containers) { http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java index 3ebe391..13a7d59 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java @@ -25,11 +25,13 @@ import org.apache.samza.Partition; import org.apache.samza.container.TaskName; import org.apache.samza.system.SystemStreamPartition; import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonProperty; /** * A mix-in Jackson class to convert Samza's TaskModel to/from JSON. */ +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class JsonTaskModelMixIn { @JsonCreator public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) { http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java index 53b59b2..15206e1 100644 --- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java +++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java @@ -19,6 +19,9 @@ package org.apache.samza.serializers.model; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -48,10 +51,6 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod; import org.codehaus.jackson.map.module.SimpleModule; import org.codehaus.jackson.type.TypeReference; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - /** * <p> * A collection of utility classes and (de)serializers to make Samza's job model @@ -59,7 +58,7 @@ import java.util.Map; * Jackson-specific code is isolated so that Samza's core data model does not * require a direct dependency on Jackson. * </p> - * + * * <p> * To use Samza's job data model, use the SamzaObjectMapper.getObjectMapper() * method. @@ -99,19 +98,31 @@ public class SamzaObjectMapper { public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { ObjectCodec oc = jp.getCodec(); JsonNode node = oc.readTree(jp); - int containerId = node.get("container-id").getIntValue(); - if (node.get("container-id") == null) { - throw new SamzaException("JobModel did not contain a container-id. This can never happen. JobModel corrupt!"); - } - String processorId; - if (node.get("processor-id") == null) { - processorId = String.valueOf(containerId); + /* + * Before Samza 0.13, "container-id" was used. + * In Samza 0.13, "processor-id" was added to be the id to use and "container-id" was deprecated. However, + * "container-id" still needed to be checked for backwards compatibility in case "processor-id" was missing + * (i.e. from a job model corresponding to a version of the job that was on a pre Samza 0.13 version). + * In Samza 1.0, "container-id" was further cleaned up from ContainerModel. This logic is still being left here + * as a fallback for backwards compatibility with pre Samza 0.13. ContainerModel.getProcessorId was changed to + * ContainerModel.getId in the Java API, but "processor-id" still needs to be used as the JSON key for backwards + * compatibility with Samza 0.13 and Samza 0.14. + */ + String id; + if (node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY) == null) { + if (node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY) == null) { + throw new SamzaException( + String.format("JobModel was missing %s and %s. This should never happen. JobModel corrupt!", + JsonContainerModelMixIn.PROCESSOR_ID_KEY, JsonContainerModelMixIn.CONTAINER_ID_KEY)); + } + id = String.valueOf(node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY).getIntValue()); } else { - processorId = node.get("processor-id").getTextValue(); + id = node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY).getTextValue(); } Map<TaskName, TaskModel> tasksMapping = - OBJECT_MAPPER.readValue(node.get("tasks"), new TypeReference<Map<TaskName, TaskModel>>() { }); - return new ContainerModel(processorId, containerId, tasksMapping); + OBJECT_MAPPER.readValue(node.get(JsonContainerModelMixIn.TASKS_KEY), + new TypeReference<Map<TaskName, TaskModel>>() { }); + return new ContainerModel(id, tasksMapping); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index f9c6c0c..bf46018 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -209,7 +209,7 @@ public class StorageRecovery extends CommandLine { for (ContainerModel containerModel : containers.values()) { HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>(); - SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks() + SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getId(), jobConfig, containerModel.getTasks() .keySet(), new MetricsRegistryMap()); for (TaskModel taskModel : containerModel.getTasks().values()) { http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index f939736..f7698c0 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -208,7 +208,7 @@ object JobModelManager extends Logging { case _ => containerGrouper.group(taskModels.asJava, containerIds) } } - val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getProcessorId -> containerModel }.toMap + val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getId -> containerModel }.toMap if (isHostAffinityEnabled) { new JobModel(config, containerMap.asJava, localityManager) http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java index e89d673..8d2d394 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java @@ -18,33 +18,22 @@ */ package org.apache.samza.container.grouper.task; -import org.apache.samza.SamzaException; -import org.apache.samza.container.LocalityManager; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.TaskModel; -import org.junit.Before; -import org.junit.Test; - import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.samza.SamzaException; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; +import org.junit.Before; +import org.junit.Test; -import static org.apache.samza.container.mock.ContainerMocks.generateTaskContainerMapping; -import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels; -import static org.apache.samza.container.mock.ContainerMocks.getTaskModel; -import static org.apache.samza.container.mock.ContainerMocks.getTaskName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.anyCollection; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.apache.samza.container.mock.ContainerMocks.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestGroupByContainerCount { private TaskAssignmentManager taskAssignmentManager; @@ -83,7 +72,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -91,8 +80,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertTrue(container0.getTasks().containsKey(getTaskName(0))); @@ -110,7 +99,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -118,8 +107,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(11, container0.getTasks().size()); assertEquals(10, container1.getTasks().size()); @@ -182,7 +171,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(4, containers.size()); @@ -194,8 +183,8 @@ public class TestGroupByContainerCount { assertNotNull(container1); assertNotNull(container2); assertNotNull(container3); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertEquals(2, container2.getTasks().size()); @@ -264,7 +253,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -272,8 +261,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(5, container0.getTasks().size()); assertEquals(4, container1.getTasks().size()); @@ -343,7 +332,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -351,8 +340,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(5, container0.getTasks().size()); assertEquals(4, container1.getTasks().size()); @@ -397,7 +386,7 @@ public class TestGroupByContainerCount { containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(3, containers.size()); @@ -407,9 +396,9 @@ public class TestGroupByContainerCount { assertNotNull(container0); assertNotNull(container1); assertNotNull(container2); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); - assertEquals("2", container2.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); + assertEquals("2", container2.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(3, container1.getTasks().size()); assertEquals(3, container2.getTasks().size()); @@ -474,7 +463,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -482,8 +471,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(5, container0.getTasks().size()); assertEquals(4, container1.getTasks().size()); @@ -544,7 +533,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -552,8 +541,8 @@ public class TestGroupByContainerCount { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(6, container0.getTasks().size()); assertEquals(3, container1.getTasks().size()); @@ -610,7 +599,7 @@ public class TestGroupByContainerCount { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(3, containers.size()); @@ -620,9 +609,9 @@ public class TestGroupByContainerCount { assertNotNull(container0); assertNotNull(container1); assertNotNull(container2); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); - assertEquals("2", container2.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); + assertEquals("2", container2.getId()); assertEquals(2, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertEquals(2, container1.getTasks().size()); @@ -782,7 +771,7 @@ public class TestGroupByContainerCount { Set<ContainerModel> prevContainers = new HashSet<>(); taskModels.forEach(model -> { prevContainers.add( - new ContainerModel(UUID.randomUUID().toString(), -1, Collections.singletonMap(model.getTaskName(), model))); + new ContainerModel(UUID.randomUUID().toString(), Collections.singletonMap(model.getTaskName(), model))); }); Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers); when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java index 13afeef..b9fe6fb 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java @@ -38,13 +38,9 @@ import org.apache.samza.job.model.TaskModel; import org.junit.Before; import org.junit.Test; -import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels; -import static org.apache.samza.container.mock.ContainerMocks.getTaskName; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.apache.samza.container.mock.ContainerMocks.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestGroupByContainerIds { @@ -91,7 +87,7 @@ public class TestGroupByContainerIds { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -99,8 +95,8 @@ public class TestGroupByContainerIds { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertTrue(container0.getTasks().containsKey(getTaskName(0))); @@ -118,7 +114,7 @@ public class TestGroupByContainerIds { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -126,8 +122,8 @@ public class TestGroupByContainerIds { ContainerModel container1 = containersMap.get("1"); assertNotNull(container0); assertNotNull(container1); - assertEquals("0", container0.getProcessorId()); - assertEquals("1", container1.getProcessorId()); + assertEquals("0", container0.getId()); + assertEquals("1", container1.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertTrue(container0.getTasks().containsKey(getTaskName(0))); @@ -159,7 +155,7 @@ public class TestGroupByContainerIds { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -167,8 +163,8 @@ public class TestGroupByContainerIds { ContainerModel container1 = containersMap.get("2"); assertNotNull(container0); assertNotNull(container1); - assertEquals("4", container0.getProcessorId()); - assertEquals("2", container1.getProcessorId()); + assertEquals("4", container0.getId()); + assertEquals("2", container1.getId()); assertEquals(3, container0.getTasks().size()); assertEquals(2, container1.getTasks().size()); assertTrue(container0.getTasks().containsKey(getTaskName(0))); @@ -195,7 +191,7 @@ public class TestGroupByContainerIds { Map<String, ContainerModel> containersMap = new HashMap<>(); for (ContainerModel container : containers) { - containersMap.put(container.getProcessorId(), container); + containersMap.put(container.getId(), container); } assertEquals(2, containers.size()); @@ -203,8 +199,8 @@ public class TestGroupByContainerIds { ContainerModel container1 = containersMap.get("2"); assertNotNull(container0); assertNotNull(container1); - assertEquals("4", container0.getProcessorId()); - assertEquals("2", container1.getProcessorId()); + assertEquals("4", container0.getId()); + assertEquals("2", container1.getId()); assertEquals(11, container0.getTasks().size()); assertEquals(10, container1.getTasks().size()); @@ -238,14 +234,13 @@ public class TestGroupByContainerIds { public void testFewerTasksThanContainers() { final String testContainerId1 = "1"; final String testContainerId2 = "2"; - final int testProcessorId = 1; Set<TaskModel> taskModels = generateTaskModels(1); List<String> containerIds = ImmutableList.of(testContainerId1, testContainerId2); Map<TaskName, TaskModel> expectedTasks = taskModels.stream() .collect(Collectors.toMap(TaskModel::getTaskName, x -> x)); - ContainerModel expectedContainerModel = new ContainerModel(testContainerId1, testProcessorId, expectedTasks); + ContainerModel expectedContainerModel = new ContainerModel(testContainerId1, expectedTasks); Set<ContainerModel> actualContainerModels = buildSimpleGrouper().group(taskModels, containerIds); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java index 9369f4b..ca9def2 100644 --- a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java +++ b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java @@ -56,7 +56,7 @@ public class ContainerMocks { Set<ContainerModel> containers = generateContainerModels(numContainers, taskCount); for (ContainerModel container : containers) { for (TaskName taskName : container.getTasks().keySet()) { - mapping.put(taskName.getTaskName(), container.getProcessorId()); + mapping.put(taskName.getTaskName(), container.getId()); } } return mapping; @@ -78,7 +78,7 @@ public class ContainerMocks { for (int partition : partitions) { tasks.put(getTaskName(partition), getTaskModel(partition)); } - return new ContainerModel(containerId, -1, tasks); + return new ContainerModel(containerId, tasks); } public static Set<TaskModel> generateTaskModels(int[] partitions) { @@ -121,7 +121,7 @@ public class ContainerMocks { Map<String, String> taskMapping = new HashMap<>(); for (ContainerModel container : containers) { for (TaskName taskName : container.getTasks().keySet()) { - taskMapping.put(taskName.getTaskName(), container.getProcessorId()); + taskMapping.put(taskName.getTaskName(), container.getId()); } } return taskMapping; http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java index b7514c4..ea25ec1 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java @@ -19,20 +19,17 @@ package org.apache.samza.coordinator; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.samza.config.Config; import org.apache.samza.container.LocalityManager; -import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.server.HttpServer; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.system.StreamMetadataCache; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Utils to create instances of {@link JobModelManager} in unit tests */ @@ -45,7 +42,7 @@ public class JobModelManagerTestUtil { public static JobModelManager getJobModelManagerWithLocalityManager(Config config, int containerCount, LocalityManager localityManager, HttpServer server) { Map<String, ContainerModel> containers = new java.util.HashMap<>(); for (int i = 0; i < containerCount; i++) { - ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>()); + ContainerModel container = new ContainerModel(String.valueOf(i), new HashMap<>()); containers.put(String.valueOf(i), container); } JobModel jobModel = new JobModel(config, containers, localityManager); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java index 6c7c282..77fe639 100644 --- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java +++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java @@ -41,8 +41,8 @@ public class TestJobModel { new TaskName("t3"), new TaskModel(new TaskName("t3"), ImmutableSet.of(), new Partition(2)), new TaskName("t4"), new TaskModel(new TaskName("t4"), ImmutableSet.of(), new Partition(3)), new TaskName("t5"), new TaskModel(new TaskName("t5"), ImmutableSet.of(), new Partition(4))); - ContainerModel containerModel1 = new ContainerModel("0", 0, tasksForContainer1); - ContainerModel containerModel2 = new ContainerModel("1", 1, tasksForContainer2); + ContainerModel containerModel1 = new ContainerModel("0", tasksForContainer1); + ContainerModel containerModel2 = new ContainerModel("1", tasksForContainer2); Map<String, ContainerModel> containers = ImmutableMap.of("0", containerModel1, "1", containerModel2); JobModel jobModel = new JobModel(config, containers); assertEquals(jobModel.maxChangeLogStreamPartitions, 5); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 6f8a8bc..bd9b2d6 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -552,13 +552,13 @@ public class TestOperatorImplGraph { ssps.add(ssp0); ssps.add(ssp2); TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0)); - ContainerModel cm0 = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0)); + ContainerModel cm0 = new ContainerModel("c0", Collections.singletonMap(task0, tm0)); TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1)); - ContainerModel cm1 = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1)); + ContainerModel cm1 = new ContainerModel("c1", Collections.singletonMap(task1, tm1)); Map<String, ContainerModel> cms = new HashMap<>(); - cms.put(cm0.getProcessorId(), cm0); - cms.put(cm1.getProcessorId(), cm1); + cms.put(cm0.getId(), cm0); + cms.put(cm1.getId(), cm1); JobModel jobModel = new JobModel(config, cms, null); Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel); http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java index 02c3a9d..0f90dd5 100644 --- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java @@ -19,8 +19,13 @@ package org.apache.samza.serializers.model; -import junit.framework.Assert; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.Map; +import java.util.Set; import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskName; @@ -29,76 +34,186 @@ import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; import org.apache.samza.system.SystemStreamPartition; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; +import org.codehaus.jackson.node.ObjectNode; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; public class TestSamzaObjectMapper { private JobModel jobModel; + private ObjectMapper samzaObjectMapper; @Before - public void setup() throws IOException { - Map<String, String> configMap = new HashMap<String, String>(); - Set<SystemStreamPartition> ssp = new HashSet<>(); - configMap.put("a", "b"); - Config config = new MapConfig(configMap); + public void setup() { + Config config = new MapConfig(ImmutableMap.of("a", "b")); TaskName taskName = new TaskName("test"); - ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1))); - TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2)); - Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>(); - tasks.put(taskName, taskModel); - ContainerModel containerModel = new ContainerModel("1", 1, tasks); - Map<String, ContainerModel> containerMap = new HashMap<String, ContainerModel>(); - containerMap.put("1", containerModel); - jobModel = new JobModel(config, containerMap); + Set<SystemStreamPartition> ssps = ImmutableSet.of(new SystemStreamPartition("foo", "bar", new Partition(1))); + TaskModel taskModel = new TaskModel(taskName, ssps, new Partition(2)); + Map<TaskName, TaskModel> tasks = ImmutableMap.of(taskName, taskModel); + ContainerModel containerModel = new ContainerModel("1", tasks); + Map<String, ContainerModel> containerMap = ImmutableMap.of("1", containerModel); + this.jobModel = new JobModel(config, containerMap); + this.samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); } @Test - public void testJsonTaskModel() throws Exception { - ObjectMapper mapper = SamzaObjectMapper.getObjectMapper(); + public void testSerializeJobModel() throws IOException { + String serializedString = this.samzaObjectMapper.writeValueAsString(this.jobModel); + // use a plain ObjectMapper to read JSON to make comparison easier + ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString); + ObjectNode expectedJson = buildJobModelJson(); + + /* + * Jackson serializes all get* methods even if they aren't regular getters. We only care about certain fields now + * since those are the only ones that get deserialized. + */ + assertEquals(expectedJson.get("config"), serializedAsJson.get("config")); + assertEquals(expectedJson.get("containers"), serializedAsJson.get("containers")); + } + + @Test + public void testDeserializeJobModel() throws IOException { + ObjectNode asJson = buildJobModelJson(); + assertEquals(this.jobModel, deserializeFromObjectNode(asJson)); + } - String str = mapper.writeValueAsString(jobModel); - JobModel obj = mapper.readValue(str, JobModel.class); - assertEquals(jobModel, obj); + /** + * Deserialization should not fail if there are fields which are ignored. + */ + @Test + public void testDeserializeWithIgnoredFields() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + // JobModel ignores all unknown fields + jobModelJson.put("unknown_job_model_key", "unknown_job_model_value"); + ObjectNode taskPartitionMappings = new ObjectMapper().createObjectNode(); + taskPartitionMappings.put("1", (Integer) null); + // old key that used to be serialized + jobModelJson.put("task-partition-mappings", taskPartitionMappings); + ObjectNode allContainerLocality = new ObjectMapper().createObjectNode(); + allContainerLocality.put("1", (Integer) null); + // currently gets serialized since there is a getAllContainerLocality + jobModelJson.put("all-container-locality", allContainerLocality); + assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); } /** - * Critical test to guarantee compatibility between samza 0.12 container models and 0.13+ - * - * Samza 0.12 contains only "container-id" (integer) in the ContainerModel. "processor-id" (String) is added in 0.13. - * When serializing, we serialize both the fields in 0.13. Deserialization correctly handles the fields in 0.13. + * Given a {@link ContainerModel} JSON with a processor-id and a container-id, deserialization should properly ignore + * the container-id. */ @Test - public void testContainerModelCompatible() throws Exception { - String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}"; - ObjectMapper mapper = SamzaObjectMapper.getObjectMapper(); - JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class); + public void testDeserializeContainerIdAndProcessorId() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); + containerModelJson.put("container-id", 123); + assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); + } - String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}"; - ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper(); - JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class); + /** + * Given a {@link ContainerModel} JSON with an unknown field, deserialization should properly ignore it. + */ + @Test + public void testDeserializeUnknownContainerModelField() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); + containerModelJson.put("unknown_container_model_key", "unknown_container_model_value"); + assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); + } - Assert.assertEquals(jobModel, jobModel1); + /** + * Given a {@link ContainerModel} JSON without a processor-id but with a container-id, deserialization should use the + * container-id to calculate the processor-id. + */ + @Test + public void testDeserializeContainerModelOnlyContainerId() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); + containerModelJson.remove("processor-id"); + containerModelJson.put("container-id", 1); + assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); } + /** + * Given a {@link ContainerModel} JSON with an unknown field, deserialization should properly ignore it. + */ @Test - public void testUnknownFieldsInJobModelJsonDoesNotFailDeserialization() throws Exception { - String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}, \"task-partition-mapping\":{\"1\":null}}"; - ObjectMapper mapper = SamzaObjectMapper.getObjectMapper(); - JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class); + public void testDeserializeUnknownTaskModelField() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode taskModelJson = (ObjectNode) jobModelJson.get("containers").get("1").get("tasks").get("test"); + taskModelJson.put("unknown_task_model_key", "unknown_task_model_value"); + assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson)); + } - String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}"; - ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper(); - JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class); + /** + * Given a {@link ContainerModel} JSON with neither a processor-id nor a container-id, deserialization should fail. + */ + @Test(expected = SamzaException.class) + public void testDeserializeContainerModelMissingProcessorIdAndContainerId() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); + containerModelJson.remove("processor-id"); + deserializeFromObjectNode(jobModelJson); + } - Assert.assertEquals(jobModel, jobModel1); + /** + * Given a {@link ContainerModel} JSON with only an "id" field, deserialization should fail. + * This verifies that even though {@link ContainerModel} has a getId method, the "id" field is not used, since + * "processor-id" is the field that is supposed to be used. + */ + @Test(expected = SamzaException.class) + public void testDeserializeContainerModelIdFieldOnly() throws IOException { + ObjectNode jobModelJson = buildJobModelJson(); + ObjectNode containerModelJson = (ObjectNode) jobModelJson.get("containers").get("1"); + containerModelJson.remove("processor-id"); + containerModelJson.put("id", 1); + deserializeFromObjectNode(jobModelJson); } + private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException { + // use plain ObjectMapper to get JSON string + String jsonString = new ObjectMapper().writeValueAsString(jobModelJson); + return this.samzaObjectMapper.readValue(jsonString, JobModel.class); + } + + /** + * Builds {@link ObjectNode} which matches the {@link JobModel} built in setup. + */ + private static ObjectNode buildJobModelJson() { + ObjectMapper objectMapper = new ObjectMapper(); + + ObjectNode configJson = objectMapper.createObjectNode(); + configJson.put("a", "b"); + + ObjectNode containerModel1TaskTestSSPJson = objectMapper.createObjectNode(); + containerModel1TaskTestSSPJson.put("system", "foo"); + containerModel1TaskTestSSPJson.put("stream", "bar"); + containerModel1TaskTestSSPJson.put("partition", 1); + + ArrayNode containerModel1TaskTestSSPsJson = objectMapper.createArrayNode(); + containerModel1TaskTestSSPsJson.add(containerModel1TaskTestSSPJson); + + ObjectNode containerModel1TaskTestJson = objectMapper.createObjectNode(); + containerModel1TaskTestJson.put("task-name", "test"); + containerModel1TaskTestJson.put("system-stream-partitions", containerModel1TaskTestSSPsJson); + containerModel1TaskTestJson.put("changelog-partition", 2); + + ObjectNode containerModel1TasksJson = objectMapper.createObjectNode(); + containerModel1TasksJson.put("test", containerModel1TaskTestJson); + + ObjectNode containerModel1Json = objectMapper.createObjectNode(); + // important: needs to be "processor-id" for compatibility between Samza 0.14 and 1.0 + containerModel1Json.put("processor-id", "1"); + containerModel1Json.put("tasks", containerModel1TasksJson); + + ObjectNode containersJson = objectMapper.createObjectNode(); + containersJson.put("1", containerModel1Json); + + ObjectNode jobModelJson = objectMapper.createObjectNode(); + jobModelJson.put("config", configJson); + jobModelJson.put("containers", containersJson); + + return jobModelJson; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 30ca8c1..57c0bf0 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -181,8 +181,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)), new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0))) val containers = Map( - "0" -> new ContainerModel("0", 0, tasks), - "1" -> new ContainerModel("1", 0, tasks)) + "0" -> new ContainerModel("0", tasks), + "1" -> new ContainerModel("1", tasks)) val jobModel = new JobModel(config, containers) def jobModelGenerator(): JobModel = jobModel val server = new HttpServer @@ -206,8 +206,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)), new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0))) val containers = Map( - "0" -> new ContainerModel("0", 0, tasks), - "1" -> new ContainerModel("1", 1, tasks)) + "0" -> new ContainerModel("0", tasks), + "1" -> new ContainerModel("1", tasks)) val jobModel = new JobModel(config, containers) def jobModelGenerator(): JobModel = jobModel val server = new HttpServer @@ -234,7 +234,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { val taskModel1 = new TaskModel(taskName1, Set(new SystemStreamPartition("input", "stream", new Partition(1))), new Partition(11)) - val containerModel = new ContainerModel("processorId", 0, Map(taskName0 -> taskModel0, taskName1 -> taskModel1)) + val containerModel = new ContainerModel("processorId", Map(taskName0 -> taskModel0, taskName1 -> taskModel1)) val changeLogSystemStreams = Map("store0" -> new SystemStream("changelogSystem0", "store0-changelog"), "store1" -> new SystemStream("changelogSystem1", "store1-changelog")) val expected = Set(new SystemStreamPartition("changelogSystem0", "store0-changelog", new Partition(10)), @@ -254,7 +254,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { val taskModel1 = new TaskModel(taskName1, Set(new SystemStreamPartition("input", "stream", new Partition(1))), new Partition(11)) - val containerModel = new ContainerModel("processorId", 0, Map(taskName0 -> taskModel0, taskName1 -> taskModel1)) + val containerModel = new ContainerModel("processorId", Map(taskName0 -> taskModel0, taskName1 -> taskModel1)) assertEquals(Set(), SamzaContainer.getChangelogSSPsForContainer(containerModel, Map())) } http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 42610ae..2488355 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -69,8 +69,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { val container1Tasks = Map( task1Name -> new TaskModel(task1Name, checkpoint1.keySet.asJava, new Partition(3))) val containers = Map( - "0" -> new ContainerModel("0", 0, container0Tasks.asJava), - "1" -> new ContainerModel("1", 1, container1Tasks.asJava)) + "0" -> new ContainerModel("0", container0Tasks.asJava), + "1" -> new ContainerModel("1", container1Tasks.asJava)) // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition @@ -137,8 +137,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { val container1Tasks = Map( task1Name -> new TaskModel(task1Name, ssp1.asJava, new Partition(3))) val containers = Map( - Integer.valueOf(0) -> new ContainerModel("0", 0, container0Tasks.asJava), - Integer.valueOf(1) -> new ContainerModel("1", 1, container1Tasks.asJava)) + Integer.valueOf(0) -> new ContainerModel("0", container0Tasks.asJava), + Integer.valueOf(1) -> new ContainerModel("1", container1Tasks.asJava)) val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4" // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as @@ -196,7 +196,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { val container0Tasks = Map( task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0))) val containers = Map( - "0" -> new ContainerModel("0", 0, container0Tasks.asJava)) + "0" -> new ContainerModel("0", container0Tasks.asJava)) val jobModel = new JobModel(config, containers.asJava) assertEquals(config, coordinator.jobModel.getConfig) assertEquals(jobModel, coordinator.jobModel) @@ -218,7 +218,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0))) val containers = Map( - "0" -> new ContainerModel("0", 0, container0Tasks.asJava)) + "0" -> new ContainerModel("0", container0Tasks.asJava)) val jobModel = new JobModel(config, containers.asJava) assertEquals(config, coordinator.jobModel.getConfig) assertEquals(jobModel, coordinator.jobModel) http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java index dbe534f..9c0dea7 100644 --- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java +++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java @@ -279,7 +279,7 @@ public class TestApplicationMasterRestClient { Set<ContainerModel> containerModels = grouper.group(taskModels); HashMap<String, ContainerModel> containers = new HashMap<>(); for (ContainerModel containerModel : containerModels) { - containers.put(containerModel.getProcessorId(), containerModel); + containers.put(containerModel.getId(), containerModel); } return containers; }
