SAMZA-1879: Remove deprecated containerId from ContainerModel

Author: Cameron Lee <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>, Shanthoosh Venkatraman 
<[email protected]>

Closes #639 from cameronlee314/remove_container_id


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3bb24c8e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3bb24c8e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3bb24c8e

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 3bb24c8eec40099dbc177df7c77f2793d7d70653
Parents: 160927a
Author: Cameron Lee <[email protected]>
Authored: Wed Sep 19 12:18:46 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Sep 19 12:18:46 2018 -0700

----------------------------------------------------------------------
 .../grouper/task/GroupByContainerCount.java     |   6 +-
 .../grouper/task/GroupByContainerIds.java       |   4 +-
 .../task/SingleContainerGrouperFactory.java     |   2 +-
 .../apache/samza/job/model/ContainerModel.java  |  31 +--
 .../org/apache/samza/job/model/JobModel.java    |   2 -
 .../model/JsonContainerModelMixIn.java          |  38 ++--
 .../serializers/model/JsonJobModelMixIn.java    |   2 +
 .../serializers/model/JsonTaskModelMixIn.java   |   2 +
 .../serializers/model/SamzaObjectMapper.java    |  41 ++--
 .../apache/samza/storage/StorageRecovery.java   |   2 +-
 .../samza/coordinator/JobModelManager.scala     |   2 +-
 .../grouper/task/TestGroupByContainerCount.java |  89 ++++----
 .../grouper/task/TestGroupByContainerIds.java   |  37 ++--
 .../samza/container/mock/ContainerMocks.java    |   6 +-
 .../coordinator/JobModelManagerTestUtil.java    |  13 +-
 .../apache/samza/job/model/TestJobModel.java    |   4 +-
 .../operators/impl/TestOperatorImplGraph.java   |   8 +-
 .../model/TestSamzaObjectMapper.java            | 205 +++++++++++++++----
 .../samza/container/TestSamzaContainer.scala    |  12 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  12 +-
 .../webapp/TestApplicationMasterRestClient.java |   2 +-
 21 files changed, 310 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 74c69d6..b4d6c90 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -78,7 +78,7 @@ public class GroupByContainerCount implements 
BalancingTaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(String.valueOf(i), i, 
taskGroups[i]));
+      containerModels.add(new ContainerModel(String.valueOf(i), 
taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);
@@ -194,7 +194,7 @@ public class GroupByContainerCount implements 
BalancingTaskNameGrouper {
   private void saveTaskAssignments(Set<ContainerModel> containers, 
TaskAssignmentManager taskAssignmentManager) {
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), 
container.getProcessorId());
+        
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), 
container.getId());
       }
     }
   }
@@ -301,7 +301,7 @@ public class GroupByContainerCount implements 
BalancingTaskNameGrouper {
         containerTaskModels.put(model.getTaskName(), model);
       }
       containerModels.add(
-          new ContainerModel(container.containerId, 
Integer.valueOf(container.containerId), containerTaskModels));
+          new ContainerModel(container.containerId, containerTaskModels));
     }
     return Collections.unmodifiableSet(containerModels);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index f5a5a86..9dab943 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -100,9 +100,7 @@ public class GroupByContainerIds implements TaskNameGrouper 
{
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      // containerId in ContainerModel constructor is set to -1 because 
processorId can be any string and does
-      // not have an integer equivalent. So, we set it to -1. After 0.13, this 
parameter will be removed.
-      containerModels.add(new ContainerModel(containersIds.get(i), -1, 
taskGroups[i]));
+      containerModels.add(new ContainerModel(containersIds.get(i), 
taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index 15cd224..ee962d5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper {
     for (TaskModel taskModel: taskModels) {
       taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
     }
-    ContainerModel containerModel = new ContainerModel(containerId, -1, 
taskNameTaskModelMap);
+    ContainerModel containerModel = new ContainerModel(containerId, 
taskNameTaskModelMap);
     return Collections.singleton(containerModel);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java 
b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index bd4fa94..980806b 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -19,10 +19,9 @@
 
 package org.apache.samza.job.model;
 
-import org.apache.samza.container.TaskName;
-
 import java.util.Collections;
 import java.util.Map;
+import org.apache.samza.container.TaskName;
 
 /**
  * <p>
@@ -41,28 +40,16 @@ import java.util.Map;
  * </p>
  */
 public class ContainerModel {
-  @Deprecated
-  private final int containerId;
-  private final String processorId;
+  private final String id;
   private final Map<TaskName, TaskModel> tasks;
 
-  public ContainerModel(String processorId, int containerId, Map<TaskName, 
TaskModel> tasks) {
-    this.containerId = containerId;
-    if (processorId == null) {
-      this.processorId = String.valueOf(containerId);
-    } else {
-      this.processorId = processorId;
-    }
+  public ContainerModel(String id, Map<TaskName, TaskModel> tasks) {
+    this.id = id;
     this.tasks = Collections.unmodifiableMap(tasks);
   }
 
-  @Deprecated
-  public int getContainerId() {
-    return containerId;
-  }
-
-  public String getProcessorId() {
-    return processorId;
+  public String getId() {
+    return id;
   }
 
   public Map<TaskName, TaskModel> getTasks() {
@@ -71,14 +58,14 @@ public class ContainerModel {
 
   @Override
   public String toString() {
-    return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + 
"]";
+    return "ContainerModel [id=" + id + ", tasks=" + tasks + "]";
   }
 
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + ((processorId == null) ? 0 : 
processorId.hashCode());
+    result = prime * result + ((id == null) ? 0 : id.hashCode());
     result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
     return result;
   }
@@ -92,7 +79,7 @@ public class ContainerModel {
     if (getClass() != obj.getClass())
       return false;
     ContainerModel other = (ContainerModel) obj;
-    if (!processorId.equals(other.processorId))
+    if (!id.equals(other.id))
       return false;
     if (tasks == null) {
       if (other.tasks != null)

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index d2f8fda..be26f10 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
 /**
  * <p>
@@ -39,7 +38,6 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties;
  * an id, partition information, etc.
  * </p>
  */
-@JsonIgnoreProperties(ignoreUnknown = true)
 public class JobModel {
   private static final String EMPTY_STRING = "";
   private final Config config;

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
index e19afec..f18c42a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -19,29 +19,35 @@
 
 package org.apache.samza.serializers.model;
 
+import java.util.Map;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskModel;
-import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
 
-import java.util.Map;
-
 /**
- * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
+ * A mix-in Jackson class to convert {@link 
org.apache.samza.job.model.ContainerModel} to JSON.
+ * Notes:
+ * 1) Constructor is not needed because this mixin is not used for 
deserialization. See {@link SamzaObjectMapper}.
+ * 2) It is unnecessary to use {@link 
org.codehaus.jackson.annotate.JsonIgnoreProperties#ignoreUnknown()} here since
+ * {@link SamzaObjectMapper} already uses custom deserialization code for the
+ * {@link org.apache.samza.job.model.ContainerModel}.
+ * 3) See {@link SamzaObjectMapper} for more context about why the JSON keys 
are named in this specified way.
  */
 public abstract class JsonContainerModelMixIn {
-  @JsonCreator
-  public JsonContainerModelMixIn(@JsonProperty("processor-id") String 
processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
-  }
-
-  @Deprecated
-  @JsonProperty("container-id")
-  abstract int getContainerId();
-
-  @JsonProperty("processor-id")
-  abstract String getProcessorId();
-
-  @JsonProperty("tasks")
+  /**
+   * This is intentionally not "id" for backwards compatibility reasons. See 
{@link SamzaObjectMapper} for more details.
+   */
+  static final String PROCESSOR_ID_KEY = "processor-id";
+  /**
+   * This is used for backwards compatibility. See {@link SamzaObjectMapper} 
for more details.
+   */
+  static final String CONTAINER_ID_KEY = "container-id";
+  static final String TASKS_KEY = "tasks";
+
+  @JsonProperty(PROCESSOR_ID_KEY)
+  abstract String getId();
+
+  @JsonProperty(TASKS_KEY)
   abstract Map<TaskName, TaskModel> getTasks();
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
index 4b0c404..c40173a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -23,11 +23,13 @@ import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.job.model.ContainerModel;
 import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 /**
  * A mix-in Jackson class to convert Samza's JobModel to/from JSON.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class JsonJobModelMixIn {
   @JsonCreator
   public JsonJobModelMixIn(@JsonProperty("config") Config config, 
@JsonProperty("containers") Map<String, ContainerModel> containers) {

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 3ebe391..13a7d59 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -25,11 +25,13 @@ import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
 import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 /**
  * A mix-in Jackson class to convert Samza's TaskModel to/from JSON.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class JsonTaskModelMixIn {
   @JsonCreator
   public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, 
@JsonProperty("system-stream-partitions") Set<SystemStreamPartition> 
systemStreamPartitions, @JsonProperty("changelog-partition") Partition 
changelogPartition) {

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 53b59b2..15206e1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.serializers.model;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -48,10 +51,6 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod;
 import org.codehaus.jackson.map.module.SimpleModule;
 import org.codehaus.jackson.type.TypeReference;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * <p>
  * A collection of utility classes and (de)serializers to make Samza's job 
model
@@ -59,7 +58,7 @@ import java.util.Map;
  * Jackson-specific code is isolated so that Samza's core data model does not
  * require a direct dependency on Jackson.
  * </p>
- * 
+ *
  * <p>
  * To use Samza's job data model, use the SamzaObjectMapper.getObjectMapper()
  * method.
@@ -99,19 +98,31 @@ public class SamzaObjectMapper {
       public ContainerModel deserialize(JsonParser jp, DeserializationContext 
ctxt) throws IOException, JsonProcessingException {
         ObjectCodec oc = jp.getCodec();
         JsonNode node = oc.readTree(jp);
-        int containerId = node.get("container-id").getIntValue();
-        if (node.get("container-id") == null) {
-          throw new SamzaException("JobModel did not contain a container-id. 
This can never happen. JobModel corrupt!");
-        }
-        String processorId;
-        if (node.get("processor-id") == null) {
-          processorId = String.valueOf(containerId);
+        /*
+         * Before Samza 0.13, "container-id" was used.
+         * In Samza 0.13, "processor-id" was added to be the id to use and 
"container-id" was deprecated. However,
+         * "container-id" still needed to be checked for backwards 
compatibility in case "processor-id" was missing
+         * (i.e. from a job model corresponding to a version of the job that 
was on a pre Samza 0.13 version).
+         * In Samza 1.0, "container-id" was further cleaned up from 
ContainerModel. This logic is still being left here
+         * as a fallback for backwards compatibility with pre Samza 0.13. 
ContainerModel.getProcessorId was changed to
+         * ContainerModel.getId in the Java API, but "processor-id" still 
needs to be used as the JSON key for backwards
+         * compatibility with Samza 0.13 and Samza 0.14.
+         */
+        String id;
+        if (node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY) == null) {
+          if (node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY) == null) {
+            throw new SamzaException(
+                String.format("JobModel was missing %s and %s. This should 
never happen. JobModel corrupt!",
+                    JsonContainerModelMixIn.PROCESSOR_ID_KEY, 
JsonContainerModelMixIn.CONTAINER_ID_KEY));
+          }
+          id = 
String.valueOf(node.get(JsonContainerModelMixIn.CONTAINER_ID_KEY).getIntValue());
         } else {
-          processorId = node.get("processor-id").getTextValue();
+          id = 
node.get(JsonContainerModelMixIn.PROCESSOR_ID_KEY).getTextValue();
         }
         Map<TaskName, TaskModel> tasksMapping =
-            OBJECT_MAPPER.readValue(node.get("tasks"), new 
TypeReference<Map<TaskName, TaskModel>>() { });
-        return new ContainerModel(processorId, containerId, tasksMapping);
+            
OBJECT_MAPPER.readValue(node.get(JsonContainerModelMixIn.TASKS_KEY),
+                new TypeReference<Map<TaskName, TaskModel>>() { });
+        return new ContainerModel(id, tasksMapping);
       }
     });
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index f9c6c0c..bf46018 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -209,7 +209,7 @@ public class StorageRecovery extends CommandLine {
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, 
StorageEngine>();
-      SamzaContainerContext containerContext = new 
SamzaContainerContext(containerModel.getProcessorId(), jobConfig, 
containerModel.getTasks()
+      SamzaContainerContext containerContext = new 
SamzaContainerContext(containerModel.getId(), jobConfig, 
containerModel.getTasks()
           .keySet(), new MetricsRegistryMap());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f939736..f7698c0 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -208,7 +208,7 @@ object JobModelManager extends Logging {
         case _ => containerGrouper.group(taskModels.asJava, containerIds)
       }
     }
-    val containerMap = containerModels.asScala.map { case (containerModel) => 
containerModel.getProcessorId -> containerModel }.toMap
+    val containerMap = containerModels.asScala.map { case (containerModel) => 
containerModel.getId -> containerModel }.toMap
 
     if (isHostAffinityEnabled) {
       new JobModel(config, containerMap.asJava, localityManager)

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index e89d673..8d2d394 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -18,33 +18,22 @@
  */
 package org.apache.samza.container.grouper.task;
 
-import org.apache.samza.SamzaException;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.TaskModel;
-import org.junit.Before;
-import org.junit.Test;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.junit.Before;
+import org.junit.Test;
 
-import static 
org.apache.samza.container.mock.ContainerMocks.generateTaskContainerMapping;
-import static 
org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
-import static org.apache.samza.container.mock.ContainerMocks.getTaskModel;
-import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.anyCollection;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.apache.samza.container.mock.ContainerMocks.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestGroupByContainerCount {
   private TaskAssignmentManager taskAssignmentManager;
@@ -83,7 +72,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -91,8 +80,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -110,7 +99,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -118,8 +107,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(11, container0.getTasks().size());
     assertEquals(10, container1.getTasks().size());
 
@@ -182,7 +171,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(4, containers.size());
@@ -194,8 +183,8 @@ public class TestGroupByContainerCount {
     assertNotNull(container1);
     assertNotNull(container2);
     assertNotNull(container3);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertEquals(2, container2.getTasks().size());
@@ -264,7 +253,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -272,8 +261,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -343,7 +332,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -351,8 +340,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -397,7 +386,7 @@ public class TestGroupByContainerCount {
 
     containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(3, containers.size());
@@ -407,9 +396,9 @@ public class TestGroupByContainerCount {
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
-    assertEquals("2", container2.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
+    assertEquals("2", container2.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(3, container1.getTasks().size());
     assertEquals(3, container2.getTasks().size());
@@ -474,7 +463,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -482,8 +471,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -544,7 +533,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -552,8 +541,8 @@ public class TestGroupByContainerCount {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(6, container0.getTasks().size());
     assertEquals(3, container1.getTasks().size());
 
@@ -610,7 +599,7 @@ public class TestGroupByContainerCount {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(3, containers.size());
@@ -620,9 +609,9 @@ public class TestGroupByContainerCount {
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
-    assertEquals("2", container2.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
+    assertEquals("2", container2.getId());
     assertEquals(2, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertEquals(2, container1.getTasks().size());
@@ -782,7 +771,7 @@ public class TestGroupByContainerCount {
     Set<ContainerModel> prevContainers = new HashSet<>();
     taskModels.forEach(model -> {
         prevContainers.add(
-          new ContainerModel(UUID.randomUUID().toString(), -1, 
Collections.singletonMap(model.getTaskName(), model)));
+          new ContainerModel(UUID.randomUUID().toString(), 
Collections.singletonMap(model.getTaskName(), model)));
       });
     Map<String, String> prevTaskToContainerMapping = 
generateTaskContainerMapping(prevContainers);
     
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index 13afeef..b9fe6fb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -38,13 +38,9 @@ import org.apache.samza.job.model.TaskModel;
 import org.junit.Before;
 import org.junit.Test;
 
-import static 
org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
-import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.apache.samza.container.mock.ContainerMocks.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestGroupByContainerIds {
@@ -91,7 +87,7 @@ public class TestGroupByContainerIds {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -99,8 +95,8 @@ public class TestGroupByContainerIds {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -118,7 +114,7 @@ public class TestGroupByContainerIds {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -126,8 +122,8 @@ public class TestGroupByContainerIds {
     ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
+    assertEquals("0", container0.getId());
+    assertEquals("1", container1.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -159,7 +155,7 @@ public class TestGroupByContainerIds {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -167,8 +163,8 @@ public class TestGroupByContainerIds {
     ContainerModel container1 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("4", container0.getProcessorId());
-    assertEquals("2", container1.getProcessorId());
+    assertEquals("4", container0.getId());
+    assertEquals("2", container1.getId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -195,7 +191,7 @@ public class TestGroupByContainerIds {
 
     Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
+      containersMap.put(container.getId(), container);
     }
 
     assertEquals(2, containers.size());
@@ -203,8 +199,8 @@ public class TestGroupByContainerIds {
     ContainerModel container1 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals("4", container0.getProcessorId());
-    assertEquals("2", container1.getProcessorId());
+    assertEquals("4", container0.getId());
+    assertEquals("2", container1.getId());
     assertEquals(11, container0.getTasks().size());
     assertEquals(10, container1.getTasks().size());
 
@@ -238,14 +234,13 @@ public class TestGroupByContainerIds {
   public void testFewerTasksThanContainers() {
     final String testContainerId1 = "1";
     final String testContainerId2 = "2";
-    final int testProcessorId = 1;
 
     Set<TaskModel> taskModels = generateTaskModels(1);
     List<String> containerIds = ImmutableList.of(testContainerId1, 
testContainerId2);
 
     Map<TaskName, TaskModel> expectedTasks = taskModels.stream()
                                                        
.collect(Collectors.toMap(TaskModel::getTaskName, x -> x));
-    ContainerModel expectedContainerModel = new 
ContainerModel(testContainerId1, testProcessorId, expectedTasks);
+    ContainerModel expectedContainerModel = new 
ContainerModel(testContainerId1, expectedTasks);
 
     Set<ContainerModel> actualContainerModels = 
buildSimpleGrouper().group(taskModels, containerIds);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
index 9369f4b..ca9def2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
@@ -56,7 +56,7 @@ public class ContainerMocks {
     Set<ContainerModel> containers = generateContainerModels(numContainers, 
taskCount);
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        mapping.put(taskName.getTaskName(), container.getProcessorId());
+        mapping.put(taskName.getTaskName(), container.getId());
       }
     }
     return mapping;
@@ -78,7 +78,7 @@ public class ContainerMocks {
     for (int partition : partitions) {
       tasks.put(getTaskName(partition), getTaskModel(partition));
     }
-    return new ContainerModel(containerId, -1, tasks);
+    return new ContainerModel(containerId, tasks);
   }
 
   public static Set<TaskModel> generateTaskModels(int[] partitions) {
@@ -121,7 +121,7 @@ public class ContainerMocks {
     Map<String, String> taskMapping = new HashMap<>();
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        taskMapping.put(taskName.getTaskName(), container.getProcessorId());
+        taskMapping.put(taskName.getTaskName(), container.getId());
       }
     }
     return taskMapping;

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index b7514c4..ea25ec1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -19,20 +19,17 @@
 
 package org.apache.samza.coordinator;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.StreamMetadataCache;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Utils to create instances of {@link JobModelManager} in unit tests
  */
@@ -45,7 +42,7 @@ public class JobModelManagerTestUtil {
   public static JobModelManager getJobModelManagerWithLocalityManager(Config 
config, int containerCount, LocalityManager localityManager, HttpServer server) 
{
     Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(String.valueOf(i), i, new 
HashMap<TaskName, TaskModel>());
+      ContainerModel container = new ContainerModel(String.valueOf(i), new 
HashMap<>());
       containers.put(String.valueOf(i), container);
     }
     JobModel jobModel = new JobModel(config, containers, localityManager);

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java 
b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java
index 6c7c282..77fe639 100644
--- a/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java
+++ b/samza-core/src/test/java/org/apache/samza/job/model/TestJobModel.java
@@ -41,8 +41,8 @@ public class TestJobModel {
         new TaskName("t3"), new TaskModel(new TaskName("t3"), 
ImmutableSet.of(), new Partition(2)),
         new TaskName("t4"), new TaskModel(new TaskName("t4"), 
ImmutableSet.of(), new Partition(3)),
         new TaskName("t5"), new TaskModel(new TaskName("t5"), 
ImmutableSet.of(), new Partition(4)));
-    ContainerModel containerModel1 = new ContainerModel("0", 0, 
tasksForContainer1);
-    ContainerModel containerModel2 = new ContainerModel("1", 1, 
tasksForContainer2);
+    ContainerModel containerModel1 = new ContainerModel("0", 
tasksForContainer1);
+    ContainerModel containerModel2 = new ContainerModel("1", 
tasksForContainer2);
     Map<String, ContainerModel> containers = ImmutableMap.of("0", 
containerModel1, "1", containerModel2);
     JobModel jobModel = new JobModel(config, containers);
     assertEquals(jobModel.maxChangeLogStreamPartitions, 5);

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 6f8a8bc..bd9b2d6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -552,13 +552,13 @@ public class TestOperatorImplGraph {
     ssps.add(ssp0);
     ssps.add(ssp2);
     TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0));
-    ContainerModel cm0 = new ContainerModel("c0", 0, 
Collections.singletonMap(task0, tm0));
+    ContainerModel cm0 = new ContainerModel("c0", 
Collections.singletonMap(task0, tm0));
     TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new 
Partition(1));
-    ContainerModel cm1 = new ContainerModel("c1", 1, 
Collections.singletonMap(task1, tm1));
+    ContainerModel cm1 = new ContainerModel("c1", 
Collections.singletonMap(task1, tm1));
 
     Map<String, ContainerModel> cms = new HashMap<>();
-    cms.put(cm0.getProcessorId(), cm0);
-    cms.put(cm1.getProcessorId(), cm1);
+    cms.put(cm0.getId(), cm0);
+    cms.put(cm1.getId(), cm1);
 
     JobModel jobModel = new JobModel(config, cms, null);
     Multimap<SystemStream, String> streamToTasks = 
OperatorImplGraph.getStreamToConsumerTasks(jobModel);

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 02c3a9d..0f90dd5 100644
--- 
a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -19,8 +19,13 @@
 
 package org.apache.samza.serializers.model;
 
-import junit.framework.Assert;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
@@ -29,76 +34,186 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStreamPartition;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import static org.junit.Assert.*;
 
-import static org.junit.Assert.assertEquals;
 
 public class TestSamzaObjectMapper {
   private JobModel jobModel;
+  private ObjectMapper samzaObjectMapper;
 
   @Before
-  public void setup() throws IOException {
-    Map<String, String> configMap = new HashMap<String, String>();
-    Set<SystemStreamPartition> ssp = new HashSet<>();
-    configMap.put("a", "b");
-    Config config = new MapConfig(configMap);
+  public void setup() {
+    Config config = new MapConfig(ImmutableMap.of("a", "b"));
     TaskName taskName = new TaskName("test");
-    ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
-    TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
-    Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
-    tasks.put(taskName, taskModel);
-    ContainerModel containerModel = new ContainerModel("1", 1, tasks);
-    Map<String, ContainerModel> containerMap = new HashMap<String, 
ContainerModel>();
-    containerMap.put("1", containerModel);
-    jobModel = new JobModel(config, containerMap);
+    Set<SystemStreamPartition> ssps = ImmutableSet.of(new 
SystemStreamPartition("foo", "bar", new Partition(1)));
+    TaskModel taskModel = new TaskModel(taskName, ssps, new Partition(2));
+    Map<TaskName, TaskModel> tasks = ImmutableMap.of(taskName, taskModel);
+    ContainerModel containerModel = new ContainerModel("1", tasks);
+    Map<String, ContainerModel> containerMap = ImmutableMap.of("1", 
containerModel);
+    this.jobModel = new JobModel(config, containerMap);
+    this.samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
   }
 
   @Test
-  public void testJsonTaskModel() throws Exception {
-    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  public void testSerializeJobModel() throws IOException {
+    String serializedString = 
this.samzaObjectMapper.writeValueAsString(this.jobModel);
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    ObjectNode serializedAsJson = (ObjectNode) new 
ObjectMapper().readTree(serializedString);
+    ObjectNode expectedJson = buildJobModelJson();
+
+    /*
+     * Jackson serializes all get* methods even if they aren't regular 
getters. We only care about certain fields now
+     * since those are the only ones that get deserialized.
+     */
+    assertEquals(expectedJson.get("config"), serializedAsJson.get("config"));
+    assertEquals(expectedJson.get("containers"), 
serializedAsJson.get("containers"));
+  }
+
+  @Test
+  public void testDeserializeJobModel() throws IOException {
+    ObjectNode asJson = buildJobModelJson();
+    assertEquals(this.jobModel, deserializeFromObjectNode(asJson));
+  }
 
-    String str = mapper.writeValueAsString(jobModel);
-    JobModel obj = mapper.readValue(str, JobModel.class);
-    assertEquals(jobModel, obj);
+  /**
+   * Deserialization should not fail if there are fields which are ignored.
+   */
+  @Test
+  public void testDeserializeWithIgnoredFields() throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    // JobModel ignores all unknown fields
+    jobModelJson.put("unknown_job_model_key", "unknown_job_model_value");
+    ObjectNode taskPartitionMappings = new ObjectMapper().createObjectNode();
+    taskPartitionMappings.put("1", (Integer) null);
+    // old key that used to be serialized
+    jobModelJson.put("task-partition-mappings", taskPartitionMappings);
+    ObjectNode allContainerLocality = new ObjectMapper().createObjectNode();
+    allContainerLocality.put("1", (Integer) null);
+    // currently gets serialized since there is a getAllContainerLocality
+    jobModelJson.put("all-container-locality", allContainerLocality);
+    assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson));
   }
 
   /**
-   * Critical test to guarantee compatibility between samza 0.12 container 
models and 0.13+
-   *
-   * Samza 0.12 contains only "container-id" (integer) in the ContainerModel. 
"processor-id" (String) is added in 0.13.
-   * When serializing, we serialize both the fields in 0.13. Deserialization 
correctly handles the fields in 0.13.
+   * Given a {@link ContainerModel} JSON with a processor-id and a 
container-id, deserialization should properly ignore
+   * the container-id.
    */
   @Test
-  public void testContainerModelCompatible() throws Exception {
-    String newJobModelString = 
"{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
-    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
-    JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+  public void testDeserializeContainerIdAndProcessorId() throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode containerModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1");
+    containerModelJson.put("container-id", 123);
+    assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson));
+  }
 
-    String oldJobModelString = 
"{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
-    ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
-    JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+  /**
+   * Given a {@link ContainerModel} JSON with an unknown field, 
deserialization should properly ignore it.
+   */
+  @Test
+  public void testDeserializeUnknownContainerModelField() throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode containerModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1");
+    containerModelJson.put("unknown_container_model_key", 
"unknown_container_model_value");
+    assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson));
+  }
 
-    Assert.assertEquals(jobModel, jobModel1);
+  /**
+   * Given a {@link ContainerModel} JSON without a processor-id but with a 
container-id, deserialization should use the
+   * container-id to calculate the processor-id.
+   */
+  @Test
+  public void testDeserializeContainerModelOnlyContainerId() throws 
IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode containerModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1");
+    containerModelJson.remove("processor-id");
+    containerModelJson.put("container-id", 1);
+    assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson));
   }
 
+  /**
+   * Given a {@link ContainerModel} JSON with an unknown field, 
deserialization should properly ignore it.
+   */
   @Test
-  public void testUnknownFieldsInJobModelJsonDoesNotFailDeserialization() 
throws Exception {
-    String newJobModelString = 
"{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null},
 \"task-partition-mapping\":{\"1\":null}}";
-    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
-    JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+  public void testDeserializeUnknownTaskModelField() throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode taskModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1").get("tasks").get("test");
+    taskModelJson.put("unknown_task_model_key", "unknown_task_model_value");
+    assertEquals(this.jobModel, deserializeFromObjectNode(jobModelJson));
+  }
 
-    String oldJobModelString = 
"{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
-    ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
-    JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+  /**
+   * Given a {@link ContainerModel} JSON with neither a processor-id nor a 
container-id, deserialization should fail.
+   */
+  @Test(expected = SamzaException.class)
+  public void testDeserializeContainerModelMissingProcessorIdAndContainerId() 
throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode containerModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1");
+    containerModelJson.remove("processor-id");
+    deserializeFromObjectNode(jobModelJson);
+  }
 
-    Assert.assertEquals(jobModel, jobModel1);
+  /**
+   * Given a {@link ContainerModel} JSON with only an "id" field, 
deserialization should fail.
+   * This verifies that even though {@link ContainerModel} has a getId method, 
the "id" field is not used, since
+   * "processor-id" is the field that is supposed to be used.
+   */
+  @Test(expected = SamzaException.class)
+  public void testDeserializeContainerModelIdFieldOnly() throws IOException {
+    ObjectNode jobModelJson = buildJobModelJson();
+    ObjectNode containerModelJson = (ObjectNode) 
jobModelJson.get("containers").get("1");
+    containerModelJson.remove("processor-id");
+    containerModelJson.put("id", 1);
+    deserializeFromObjectNode(jobModelJson);
   }
 
+  private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws 
IOException {
+    // use plain ObjectMapper to get JSON string
+    String jsonString = new ObjectMapper().writeValueAsString(jobModelJson);
+    return this.samzaObjectMapper.readValue(jsonString, JobModel.class);
+  }
+
+  /**
+   * Builds {@link ObjectNode} which matches the {@link JobModel} built in 
setup.
+   */
+  private static ObjectNode buildJobModelJson() {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    ObjectNode configJson = objectMapper.createObjectNode();
+    configJson.put("a", "b");
+
+    ObjectNode containerModel1TaskTestSSPJson = 
objectMapper.createObjectNode();
+    containerModel1TaskTestSSPJson.put("system", "foo");
+    containerModel1TaskTestSSPJson.put("stream", "bar");
+    containerModel1TaskTestSSPJson.put("partition", 1);
+
+    ArrayNode containerModel1TaskTestSSPsJson = objectMapper.createArrayNode();
+    containerModel1TaskTestSSPsJson.add(containerModel1TaskTestSSPJson);
+
+    ObjectNode containerModel1TaskTestJson = objectMapper.createObjectNode();
+    containerModel1TaskTestJson.put("task-name", "test");
+    containerModel1TaskTestJson.put("system-stream-partitions", 
containerModel1TaskTestSSPsJson);
+    containerModel1TaskTestJson.put("changelog-partition", 2);
+
+    ObjectNode containerModel1TasksJson = objectMapper.createObjectNode();
+    containerModel1TasksJson.put("test", containerModel1TaskTestJson);
+
+    ObjectNode containerModel1Json = objectMapper.createObjectNode();
+    // important: needs to be "processor-id" for compatibility between Samza 
0.14 and 1.0
+    containerModel1Json.put("processor-id", "1");
+    containerModel1Json.put("tasks", containerModel1TasksJson);
+
+    ObjectNode containersJson = objectMapper.createObjectNode();
+    containersJson.put("1", containerModel1Json);
+
+    ObjectNode jobModelJson = objectMapper.createObjectNode();
+    jobModelJson.put("config", configJson);
+    jobModelJson.put("containers", containersJson);
+
+    return jobModelJson;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 30ca8c1..57c0bf0 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -181,8 +181,8 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), 
offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), 
offsets.keySet(), new Partition(0)))
     val containers = Map(
-      "0" -> new ContainerModel("0", 0, tasks),
-      "1" -> new ContainerModel("1", 0, tasks))
+      "0" -> new ContainerModel("0", tasks),
+      "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
@@ -206,8 +206,8 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), 
offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), 
offsets.keySet(), new Partition(0)))
     val containers = Map(
-      "0" -> new ContainerModel("0", 0, tasks),
-      "1" -> new ContainerModel("1", 1, tasks))
+      "0" -> new ContainerModel("0", tasks),
+      "1" -> new ContainerModel("1", tasks))
     val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
@@ -234,7 +234,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     val taskModel1 = new TaskModel(taskName1,
       Set(new SystemStreamPartition("input", "stream", new Partition(1))),
       new Partition(11))
-    val containerModel = new ContainerModel("processorId", 0, Map(taskName0 -> 
taskModel0, taskName1 -> taskModel1))
+    val containerModel = new ContainerModel("processorId", Map(taskName0 -> 
taskModel0, taskName1 -> taskModel1))
     val changeLogSystemStreams = Map("store0" -> new 
SystemStream("changelogSystem0", "store0-changelog"),
       "store1" -> new SystemStream("changelogSystem1", "store1-changelog"))
     val expected = Set(new SystemStreamPartition("changelogSystem0", 
"store0-changelog", new Partition(10)),
@@ -254,7 +254,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     val taskModel1 = new TaskModel(taskName1,
       Set(new SystemStreamPartition("input", "stream", new Partition(1))),
       new Partition(11))
-    val containerModel = new ContainerModel("processorId", 0, Map(taskName0 -> 
taskModel0, taskName1 -> taskModel1))
+    val containerModel = new ContainerModel("processorId", Map(taskName0 -> 
taskModel0, taskName1 -> taskModel1))
     assertEquals(Set(), 
SamzaContainer.getChangelogSSPsForContainer(containerModel, Map()))
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 42610ae..2488355 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -69,8 +69,8 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     val container1Tasks = Map(
       task1Name -> new TaskModel(task1Name, checkpoint1.keySet.asJava, new 
Partition(3)))
     val containers = Map(
-      "0" -> new ContainerModel("0", 0, container0Tasks.asJava),
-      "1" -> new ContainerModel("1", 1, container1Tasks.asJava))
+      "0" -> new ContainerModel("0", container0Tasks.asJava),
+      "1" -> new ContainerModel("1", container1Tasks.asJava))
 
 
     // The test does not pass offsets for task2 (Partition 2) to the 
checkpointmanager, this will verify that we get an offset 0 for this partition
@@ -137,8 +137,8 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     val container1Tasks = Map(
       task1Name -> new TaskModel(task1Name, ssp1.asJava, new Partition(3)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel("0", 0, container0Tasks.asJava),
-      Integer.valueOf(1) -> new ContainerModel("1", 1, container1Tasks.asJava))
+      Integer.valueOf(0) -> new ContainerModel("0", container0Tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel("1", container1Tasks.asJava))
     val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX 
+ "mock:" + task0Name.getTaskName() -> "4"
 
     // Configs which are processed by the MockCoordinatorStream as special 
configs which are interpreted as
@@ -196,7 +196,7 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     val container0Tasks = Map(
       task1Name -> new TaskModel(task1Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new 
Partition(0)))
     val containers = Map(
-      "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
+      "0" -> new ContainerModel("0", container0Tasks.asJava))
     val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)
@@ -218,7 +218,7 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
       task1Name -> new TaskModel(task1Name, Set(new 
SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new 
Partition(0)))
 
     val containers = Map(
-      "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
+      "0" -> new ContainerModel("0", container0Tasks.asJava))
     val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)

http://git-wip-us.apache.org/repos/asf/samza/blob/3bb24c8e/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index dbe534f..9c0dea7 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -279,7 +279,7 @@ public class TestApplicationMasterRestClient {
     Set<ContainerModel> containerModels = grouper.group(taskModels);
     HashMap<String, ContainerModel> containers = new HashMap<>();
     for (ContainerModel containerModel : containerModels) {
-      containers.put(containerModel.getProcessorId(), containerModel);
+      containers.put(containerModel.getId(), containerModel);
     }
     return containers;
   }

Reply via email to