This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1169b31  [SAMZA-2667] Refactor logic in JobModelManager so it is 
easier to use certain parts of it (#1515)
1169b31 is described below

commit 1169b316dd51a6c972c7fc7ab77ac7e34ba5fbde
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 23 16:57:54 2021 -0700

    [SAMZA-2667] Refactor logic in JobModelManager so it is easier to use 
certain parts of it (#1515)
    
    Changes:
    1. Extract JobModelManager.readJobModel into a separate class 
JobModelCalculator which is only responsible for calculating JobModel. Convert 
this logic from scala to java.
    2. Extract some of the logic of JobModelManager.apply (get grouper 
metadata, update task assignments) into a separate class JobModelHelper so that 
logic is not tied to HttpServer for communication. Convert this logic from 
scala to java.
    3. Fix a bug regarding deletion of standby task assignments. Assignments 
should get cleared when the old set of standby tasks is not the same as the new 
set of standby tasks, but that was not happening because the condition was 
checking for equality and it wasn't comparing the same type of objects. This 
bug fix is the only expected functionality change.
    4. Add more tests around the job model logic.
    
    API changes and upgrade/usage instructions: N/A
---
 .../samza/coordinator/AzureJobCoordinator.java     |   3 +-
 .../samza/coordinator/JobModelCalculator.java      | 240 ++++++++++
 .../apache/samza/coordinator/JobModelHelper.java   | 217 +++++++++
 .../standalone/PassthroughJobCoordinator.java      |   5 +-
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |   5 +-
 .../apache/samza/coordinator/JobModelManager.scala | 356 +--------------
 .../samza/coordinator/TestJobModelCalculator.java  | 487 +++++++++++++++++++++
 .../samza/coordinator/TestJobModelHelper.java      | 472 ++++++++++++++++++++
 .../samza/coordinator/TestJobModelManager.java     | 268 ------------
 ...Coordinator.scala => TestJobModelManager.scala} |  37 +-
 10 files changed, 1440 insertions(+), 650 deletions(-)

diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 08e8124..710eefb 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -367,7 +367,8 @@ public class AzureJobCoordinator implements JobCoordinator {
     // Generate the new JobModel
     GrouperMetadata grouperMetadata = new 
GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap());
     JobModel newJobModel =
-        JobModelManager.readJobModel(this.config, Collections.emptyMap(), 
streamMetadataCache, grouperMetadata);
+        JobModelCalculator.INSTANCE.calculateJobModel(this.config, 
Collections.emptyMap(), streamMetadataCache,
+            grouperMetadata);
     LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + 
nextJMVersion);
 
     // Publish the new job model
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java
new file mode 100644
index 0000000..09597f0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SSPGrouperProxy;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory;
+import org.apache.samza.container.grouper.task.TaskNameGrouperProxy;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionMatcher;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+public class JobModelCalculator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobModelCalculator.class);
+  public static final JobModelCalculator INSTANCE = new JobModelCalculator();
+
+  private JobModelCalculator() {
+  }
+
+  /**
+   * Does the following:
+   * 1. Fetches metadata of the input streams defined in configuration through 
{@code streamMetadataCache}.
+   * 2. Applies the SSP grouper and task name grouper defined in the 
configuration to build the {@link JobModel}.
+   * @param originalConfig the configuration of the job.
+   * @param changeLogPartitionMapping the task to changelog partition mapping 
of the job.
+   * @param streamMetadataCache the cache that holds the partition metadata of 
the input streams.
+   * @param grouperMetadata provides the historical metadata of the 
application.
+   * @return the built {@link JobModel}.
+   */
+  public JobModel calculateJobModel(Config originalConfig, Map<TaskName, 
Integer> changeLogPartitionMapping,
+      StreamMetadataCache streamMetadataCache, GrouperMetadata 
grouperMetadata) {
+    // refresh config if enabled regex topic rewriter
+    Config refreshedConfig = refreshConfigByRegexTopicRewriter(originalConfig);
+
+    TaskConfig taskConfig = new TaskConfig(refreshedConfig);
+    // Do grouping to fetch TaskName to SSP mapping
+    Set<SystemStreamPartition> allSystemStreamPartitions =
+        getMatchedInputStreamPartitions(refreshedConfig, streamMetadataCache);
+
+    // processor list is required by some of the groupers. So, let's pass them 
as part of the config.
+    // Copy the config and add the processor list to the config copy.
+    // TODO: It is non-ideal to have config as a medium to transmit the 
locality information; especially, if the locality information evolves. Evaluate 
options on using context objects to pass dependent components.
+    Map<String, String> configMap = new HashMap<>(refreshedConfig);
+    configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", 
grouperMetadata.getProcessorLocality().keySet()));
+    SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper(new 
MapConfig(configMap));
+
+    JobConfig jobConfig = new JobConfig(refreshedConfig);
+
+    Map<TaskName, Set<SystemStreamPartition>> groups;
+    if (jobConfig.isSSPGrouperProxyEnabled()) {
+      SSPGrouperProxy sspGrouperProxy = new SSPGrouperProxy(refreshedConfig, 
grouper);
+      groups = sspGrouperProxy.group(allSystemStreamPartitions, 
grouperMetadata);
+    } else {
+      LOG.warn(String.format(
+          "SSPGrouperProxy is disabled (%s = false). Stateful jobs may produce 
erroneous results if this is not enabled.",
+          JobConfig.SSP_INPUT_EXPANSION_ENABLED));
+      groups = grouper.group(allSystemStreamPartitions);
+    }
+    LOG.info(String.format(
+        "SystemStreamPartitionGrouper %s has grouped the 
SystemStreamPartitions into %d tasks with the following taskNames: %s",
+        grouper, groups.size(), groups));
+
+    // If no mappings are present (first time the job is running) we return 
-1, this will allow 0 to be the first change
+    // mapping.
+    int maxChangelogPartitionId = 
changeLogPartitionMapping.values().stream().max(Comparator.naturalOrder()).orElse(-1);
+    // Sort the groups prior to assigning the changelog mapping so that the 
mapping is reproducible and intuitive
+    TreeMap<TaskName, Set<SystemStreamPartition>> sortedGroups = new 
TreeMap<>(groups);
+    Set<TaskModel> taskModels = new HashSet<>();
+    for (Map.Entry<TaskName, Set<SystemStreamPartition>> group : 
sortedGroups.entrySet()) {
+      TaskName taskName = group.getKey();
+      Set<SystemStreamPartition> systemStreamPartitions = group.getValue();
+      Optional<Integer> changelogPartitionId = 
Optional.ofNullable(changeLogPartitionMapping.get(taskName));
+      Partition changelogPartition;
+      if (changelogPartitionId.isPresent()) {
+        changelogPartition = new Partition(changelogPartitionId.get());
+      } else {
+        // If we've never seen this TaskName before, then assign it a new 
changelog partition.
+        maxChangelogPartitionId++;
+        LOG.info(
+            String.format("New task %s is being assigned changelog partition 
%s.", taskName, maxChangelogPartitionId));
+        changelogPartition = new Partition(maxChangelogPartitionId);
+      }
+      taskModels.add(new TaskModel(taskName, systemStreamPartitions, 
changelogPartition));
+    }
+
+    // Here is where we should put in a pluggable option for the 
SSPTaskNameGrouper for locality, load-balancing, etc.
+    TaskNameGrouperFactory containerGrouperFactory =
+        ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory(), 
TaskNameGrouperFactory.class);
+    boolean standbyTasksEnabled = jobConfig.getStandbyTasksEnabled();
+    int standbyTaskReplicationFactor = 
jobConfig.getStandbyTaskReplicationFactor();
+    TaskNameGrouperProxy taskNameGrouperProxy =
+        new 
TaskNameGrouperProxy(containerGrouperFactory.build(refreshedConfig), 
standbyTasksEnabled,
+            standbyTaskReplicationFactor);
+    Set<ContainerModel> containerModels;
+    boolean isHostAffinityEnabled = new 
ClusterManagerConfig(refreshedConfig).getHostAffinityEnabled();
+    if (isHostAffinityEnabled) {
+      containerModels = taskNameGrouperProxy.group(taskModels, 
grouperMetadata);
+    } else {
+      containerModels =
+          taskNameGrouperProxy.group(taskModels, new 
ArrayList<>(grouperMetadata.getProcessorLocality().keySet()));
+    }
+
+    Map<String, ContainerModel> containerMap =
+        
containerModels.stream().collect(Collectors.toMap(ContainerModel::getId, 
Function.identity()));
+    return new JobModel(refreshedConfig, containerMap);
+  }
+
+  /**
+   * Refresh Kafka topic list used as input streams if enabled {@link 
org.apache.samza.config.RegExTopicGenerator}
+   * @param originalConfig Samza job config
+   * @return refreshed config
+   */
+  private static Config refreshConfigByRegexTopicRewriter(Config 
originalConfig) {
+    JobConfig jobConfig = new JobConfig(originalConfig);
+    Optional<String> configRewriters = jobConfig.getConfigRewriters();
+    Config resultConfig = originalConfig;
+    if (configRewriters.isPresent()) {
+      for (String rewriterName : configRewriters.get().split(",")) {
+        String rewriterClass = jobConfig.getConfigRewriterClass(rewriterName)
+            .orElseThrow(() -> new ConfigException(
+                String.format("Unable to find class config for config rewriter 
%s.", rewriterName)));
+        if 
(rewriterClass.equalsIgnoreCase(RegExTopicGenerator.class.getName())) {
+          resultConfig = ConfigUtil.applyRewriter(resultConfig, rewriterName);
+        }
+      }
+    }
+    return resultConfig;
+  }
+
+  /**
+   * Builds the input {@see SystemStreamPartition} based upon the {@param 
config} defined by the user.
+   * @param config configuration to fetch the metadata of the input streams.
+   * @param streamMetadataCache required to query the partition metadata of 
the input streams.
+   * @return the input SystemStreamPartitions of the job.
+   */
+  private static Set<SystemStreamPartition> 
getMatchedInputStreamPartitions(Config config,
+      StreamMetadataCache streamMetadataCache) {
+    Set<SystemStreamPartition> allSystemStreamPartitions = 
getInputStreamPartitions(config, streamMetadataCache);
+    JobConfig jobConfig = new JobConfig(config);
+    Optional<String> sspMatcherClassName = jobConfig.getSSPMatcherClass();
+    if (sspMatcherClassName.isPresent()) {
+      String sspMatcherConfigJobFactoryRegex = 
jobConfig.getSSPMatcherConfigJobFactoryRegex();
+      Optional<String> streamJobFactoryClass = 
jobConfig.getStreamJobFactoryClass();
+      if (streamJobFactoryClass.isPresent() && 
Pattern.matches(sspMatcherConfigJobFactoryRegex,
+          streamJobFactoryClass.get())) {
+        LOG.info(String.format("before match: allSystemStreamPartitions.size = 
%s", allSystemStreamPartitions.size()));
+        SystemStreamPartitionMatcher sspMatcher =
+            ReflectionUtil.getObj(sspMatcherClassName.get(), 
SystemStreamPartitionMatcher.class);
+        Set<SystemStreamPartition> matchedPartitions = 
sspMatcher.filter(allSystemStreamPartitions, config);
+        // Usually a small set hence ok to log at info level
+        LOG.info(String.format("after match: matchedPartitions = %s", 
matchedPartitions));
+        return matchedPartitions;
+      }
+    }
+    return allSystemStreamPartitions;
+  }
+
+  /**
+   * Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param 
config}. Instantiates the
+   * {@see SystemStreamPartitionGrouper} object through the factory.
+   * @param config the configuration of the samza job.
+   * @return the instantiated {@see SystemStreamPartitionGrouper}.
+   */
+  private static SystemStreamPartitionGrouper 
getSystemStreamPartitionGrouper(Config config) {
+    String factoryString = new 
JobConfig(config).getSystemStreamPartitionGrouperFactory();
+    SystemStreamPartitionGrouperFactory factory =
+        ReflectionUtil.getObj(factoryString, 
SystemStreamPartitionGrouperFactory.class);
+    return factory.getSystemStreamPartitionGrouper(config);
+  }
+
+  /**
+   * Computes the input system stream partitions of a samza job using the 
provided {@param config}
+   * and {@param streamMetadataCache}.
+   * @param config the configuration of the job.
+   * @param streamMetadataCache to query the partition metadata of the input 
streams.
+   * @return the input {@see SystemStreamPartition} of the samza job.
+   */
+  private static Set<SystemStreamPartition> getInputStreamPartitions(Config 
config,
+      StreamMetadataCache streamMetadataCache) {
+    TaskConfig taskConfig = new TaskConfig(config);
+    // Get the set of partitions for each SystemStream from the stream metadata
+    Map<SystemStream, SystemStreamMetadata> allMetadata = 
JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams()).asScala().toSet(),
 true)).asJava();
+    Set<SystemStreamPartition> inputStreamPartitions = new HashSet<>();
+    allMetadata.forEach((systemStream, systemStreamMetadata) -> 
systemStreamMetadata.getSystemStreamPartitionMetadata()
+        .keySet()
+        .forEach(partition -> inputStreamPartitions.add(new 
SystemStreamPartition(systemStream, partition))));
+    return inputStreamPartitions;
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
new file mode 100644
index 0000000..c122c37
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobModelHelper {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobModelHelper.class);
+
+  private final LocalityManager localityManager;
+  private final TaskAssignmentManager taskAssignmentManager;
+  private final TaskPartitionAssignmentManager taskPartitionAssignmentManager;
+  private final StreamMetadataCache streamMetadataCache;
+  private final JobModelCalculator jobModelCalculator;
+
+  public JobModelHelper(LocalityManager localityManager, TaskAssignmentManager 
taskAssignmentManager,
+      TaskPartitionAssignmentManager taskPartitionAssignmentManager, 
StreamMetadataCache streamMetadataCache,
+      JobModelCalculator jobModelCalculator) {
+    this.localityManager = localityManager;
+    this.taskAssignmentManager = taskAssignmentManager;
+    this.taskPartitionAssignmentManager = taskPartitionAssignmentManager;
+    this.streamMetadataCache = streamMetadataCache;
+    this.jobModelCalculator = jobModelCalculator;
+  }
+
+  public JobModel newJobModel(Config config, Map<TaskName, Integer> 
changelogPartitionMapping) {
+    GrouperMetadata grouperMetadata = getGrouperMetadata(config, 
this.localityManager, this.taskAssignmentManager,
+        this.taskPartitionAssignmentManager);
+    JobModel jobModel =
+        this.jobModelCalculator.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            grouperMetadata);
+    updateTaskAssignments(jobModel, this.taskAssignmentManager, 
this.taskPartitionAssignmentManager, grouperMetadata);
+    return jobModel;
+  }
+
+  private GrouperMetadata getGrouperMetadata(Config config, LocalityManager 
localityManager,
+      TaskAssignmentManager taskAssignmentManager, 
TaskPartitionAssignmentManager taskPartitionAssignmentManager) {
+    Map<String, LocationId> processorLocality = getProcessorLocality(config, 
localityManager);
+    Map<TaskName, TaskMode> taskModes = taskAssignmentManager.readTaskModes();
+
+    Map<TaskName, String> taskNameToProcessorId = new HashMap<>();
+    Map<TaskName, LocationId> taskLocality = new HashMap<>();
+    // We read the taskAssignment only for ActiveTasks, i.e., tasks that have 
no task-mode or have an active task mode
+    taskAssignmentManager.readTaskAssignment().forEach((taskNameString, 
containerId) -> {
+      TaskName taskName = new TaskName(taskNameString);
+      if (isActiveTask(taskName, taskModes)) {
+        taskNameToProcessorId.put(taskName, containerId);
+        if (processorLocality.containsKey(containerId)) {
+          taskLocality.put(taskName, processorLocality.get(containerId));
+        }
+      }
+    });
+
+    Map<SystemStreamPartition, List<String>> sspToTaskMapping =
+        taskPartitionAssignmentManager.readTaskPartitionAssignments();
+    Map<TaskName, List<SystemStreamPartition>> taskPartitionAssignments = new 
HashMap<>();
+    // Task to partition assignments is stored as {@see SystemStreamPartition} 
to list of {@see TaskName} in
+    // coordinator stream. This is done due to the 1 MB value size limit in a 
kafka topic. Conversion to
+    // taskName to SystemStreamPartitions is done here to wire-in the data to 
{@see JobModel}.
+    sspToTaskMapping.forEach((systemStreamPartition, taskNames) -> 
taskNames.forEach(taskNameString -> {
+      TaskName taskName = new TaskName(taskNameString);
+      if (isActiveTask(taskName, taskModes)) {
+        taskPartitionAssignments.putIfAbsent(taskName, new ArrayList<>());
+        taskPartitionAssignments.get(taskName).add(systemStreamPartition);
+      }
+    }));
+    return new GrouperMetadataImpl(processorLocality, taskLocality, 
taskPartitionAssignments, taskNameToProcessorId);
+  }
+
+  /**
+   * Retrieves and returns the processor locality of a samza job using 
provided {@see Config} and {@see LocalityManager}.
+   * @param config provides the configurations defined by the user. Required 
to connect to the storage layer.
+   * @param localityManager provides the processor to host mapping persisted 
to the metadata store.
+   * @return the processor locality.
+   */
+  private static Map<String, LocationId> getProcessorLocality(Config config, 
LocalityManager localityManager) {
+    Map<String, LocationId> containerToLocationId = new HashMap<>();
+    Map<String, ProcessorLocality> existingContainerLocality = 
localityManager.readLocality().getProcessorLocalities();
+
+    for (int i = 0; i < new JobConfig(config).getContainerCount(); i++) {
+      String containerId = Integer.toString(i);
+      LocationId locationId = 
Optional.ofNullable(existingContainerLocality.get(containerId))
+          .map(ProcessorLocality::host)
+          .filter(StringUtils::isNotEmpty)
+          .map(LocationId::new)
+          // To handle the case when the container count is increased between 
two different runs of a samza-yarn job,
+          // set the locality of newly added containers to any_host.
+          .orElse(new LocationId("ANY_HOST"));
+      containerToLocationId.put(containerId, locationId);
+    }
+    return containerToLocationId;
+  }
+
+  /**
+   * This method does the following:
+   * 1. Deletes the existing task assignments if the partition-task grouping 
has changed from the previous run of the job.
+   * 2. Saves the newly generated task assignments to the storage layer 
through the {@param TaskAssignementManager}.
+   *
+   * @param jobModel              represents the {@see JobModel} of the samza 
job.
+   * @param taskAssignmentManager required to persist the processor to task 
assignments to the metadata store.
+   * @param taskPartitionAssignmentManager required to persist the task to 
partition assignments to the metadata store.
+   * @param grouperMetadata       provides the historical metadata of the 
samza application.
+   */
+  private void updateTaskAssignments(JobModel jobModel, TaskAssignmentManager 
taskAssignmentManager,
+      TaskPartitionAssignmentManager taskPartitionAssignmentManager, 
GrouperMetadata grouperMetadata) {
+    LOG.info("Storing the task assignments into metadata store.");
+    Set<String> activeTaskNames = new HashSet<>();
+    Set<String> standbyTaskNames = new HashSet<>();
+    Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>();
+
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        if (TaskMode.Active.equals(taskModel.getTaskMode())) {
+          activeTaskNames.add(taskModel.getTaskName().getTaskName());
+        }
+        if (TaskMode.Standby.equals(taskModel.getTaskMode())) {
+          standbyTaskNames.add(taskModel.getTaskName().getTaskName());
+        }
+        systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions());
+      }
+    }
+
+    Map<TaskName, String> previousTaskToContainerId = 
grouperMetadata.getPreviousTaskToProcessorAssignment();
+    if (activeTaskNames.size() != previousTaskToContainerId.size()) {
+      LOG.warn(String.format(
+          "Current task count %s does not match saved task count %s. Stateful 
jobs may observe misalignment of keys!",
+          activeTaskNames.size(), previousTaskToContainerId.size()));
+      // If the tasks changed, then the partition-task grouping is also likely 
changed and we can't handle that
+      // without a much more complicated mapping. Further, the partition count 
may have changed, which means
+      // input message keys are likely reshuffled w.r.t. partitions, so the 
local state may not contain necessary
+      // data associated with the incoming keys. Warn the user and default to 
grouper
+      // In this scenario the tasks may have been reduced, so we need to 
delete all the existing messages
+      taskAssignmentManager.deleteTaskContainerMappings(
+          
previousTaskToContainerId.keySet().stream().map(TaskName::getTaskName).collect(Collectors.toList()));
+      taskPartitionAssignmentManager.delete(systemStreamPartitions);
+    }
+
+    // if the set of standby tasks has changed, e.g., when the 
replication-factor changed, or the active-tasks-set has
+    // changed, we log a warning and delete the existing mapping for these 
tasks
+    Set<String> previousStandbyTasks = taskAssignmentManager.readTaskModes()
+        .entrySet()
+        .stream()
+        .filter(taskNameToTaskModeEntry -> 
TaskMode.Standby.equals(taskNameToTaskModeEntry.getValue()))
+        .map(taskNameToTaskModeEntry -> 
taskNameToTaskModeEntry.getKey().getTaskName())
+        .collect(Collectors.toSet());
+    if (!standbyTaskNames.equals(previousStandbyTasks)) {
+      LOG.info(
+          String.format("The set of standby tasks has changed, current standby 
tasks %s, previous standby tasks %s",
+              standbyTaskNames, previousStandbyTasks));
+      taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks);
+    }
+
+    // Task to partition assignments is stored as {@see SystemStreamPartition} 
to list of {@see TaskName} in
+    // coordinator stream. This is done due to the 1 MB value size limit in a 
kafka topic.
+    Map<SystemStreamPartition, List<String>> sspToTaskNameMap = new 
HashMap<>();
+    Map<String, Map<String, TaskMode>> taskContainerMappings = new HashMap<>();
+    for (ContainerModel containerModel : jobModel.getContainers().values()) {
+      containerModel.getTasks().forEach((taskName, taskModel) -> {
+        taskContainerMappings.putIfAbsent(containerModel.getId(), new 
HashMap<>());
+        
taskContainerMappings.get(containerModel.getId()).put(taskName.getTaskName(), 
taskModel.getTaskMode());
+        taskModel.getSystemStreamPartitions().forEach(systemStreamPartition -> 
{
+          sspToTaskNameMap.putIfAbsent(systemStreamPartition, new 
ArrayList<>());
+          
sspToTaskNameMap.get(systemStreamPartition).add(taskName.getTaskName());
+        });
+      });
+    }
+    taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
+    
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
+  }
+
+  private static boolean isActiveTask(TaskName taskName, Map<TaskName, 
TaskMode> taskModes) {
+    return !taskModes.containsKey(taskName) || 
TaskMode.Active.equals(taskModes.get(taskName));
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 7bf29fe..a340bef 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -26,7 +26,7 @@ import 
org.apache.samza.container.grouper.task.GrouperMetadata;
 import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelCalculator;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -130,7 +130,8 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
       GrouperMetadata grouperMetadata =
           new GrouperMetadataImpl(ImmutableMap.of(String.valueOf(containerId), 
locationId), Collections.emptyMap(),
               Collections.emptyMap(), Collections.emptyMap());
-      return JobModelManager.readJobModel(this.config, Collections.emptyMap(), 
streamMetadataCache, grouperMetadata);
+      return JobModelCalculator.INSTANCE.calculateJobModel(this.config, 
Collections.emptyMap(), streamMetadataCache,
+          grouperMetadata);
     } finally {
       systemAdmins.stop();
     }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index bc24752..17a8d5c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -42,7 +42,7 @@ import 
org.apache.samza.container.grouper.task.GrouperMetadata;
 import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelCalculator;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
@@ -396,7 +396,8 @@ public class ZkJobCoordinator implements JobCoordinator {
     }
 
     GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, 
processorNodes);
-    JobModel model = JobModelManager.readJobModel(config, 
changeLogPartitionMap, streamMetadataCache, grouperMetadata);
+    JobModel model = JobModelCalculator.INSTANCE.calculateJobModel(config, 
changeLogPartitionMap, streamMetadataCache,
+        grouperMetadata);
     return new JobModel(new MapConfig(), model.getContainers());
   }
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5dd662b..25ce582 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -19,39 +19,21 @@
 
 package org.apache.samza.coordinator
 
-import java.util
-import java.util.concurrent.atomic.AtomicReference
-
-import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.config._
-import org.apache.samza.config.Config
-import org.apache.samza.container.grouper.stream.SSPGrouperProxy
-import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import org.apache.samza.config.{Config, _}
+import org.apache.samza.container.{LocalityManager, TaskName}
 import org.apache.samza.container.grouper.task._
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
-import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
-import org.apache.samza.container.LocalityManager
-import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet, 
LocalityServlet}
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping, 
SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
 import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskMode
-import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metadatastore.MetadataStore
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.runtime.LocationId
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.system._
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.Logging
 
-import scala.collection.JavaConverters
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import java.util
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * Helper companion object that is responsible for wiring up a JobModelManager
@@ -91,15 +73,13 @@ object JobModelManager extends Logging {
     try {
       systemAdmins.start()
       val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
-      val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, 
localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
-
-      val jobModel = readJobModel(config, changelogPartitionMapping, 
streamMetadataCache, grouperMetadata)
+      val jobModelHelper = new JobModelHelper(localityManager, 
taskAssignmentManager, taskPartitionAssignmentManager,
+        streamMetadataCache, JobModelCalculator.INSTANCE)
+      val jobModel = jobModelHelper.newJobModel(config, 
changelogPartitionMapping)
       val jobModelToServe = new JobModel(jobModel.getConfig, 
jobModel.getContainers)
       val serializedJobModelToServe = 
SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
       serializedJobModelRef.set(serializedJobModelToServe)
 
-      updateTaskAssignments(jobModel, taskAssignmentManager, 
taskPartitionAssignmentManager, grouperMetadata)
-
       val clusterManagerConfig = new ClusterManagerConfig(config)
       val server = new HttpServer(port = 
clusterManagerConfig.getCoordinatorUrlPort)
       server.addServlet("/", new JobServlet(serializedJobModelRef))
@@ -112,322 +92,6 @@ object JobModelManager extends Logging {
       // Not closing coordinatorStreamStore, since {@code 
ClusterBasedJobCoordinator} uses it to read container locality through {@code 
JobModel}.
     }
   }
-
-  /**
-    * Builds the {@see GrouperMetadataImpl} for the samza job.
-    * @param config represents the configurations defined by the user.
-    * @param localityManager provides the processor to host mapping persisted 
to the metadata store.
-    * @param taskAssignmentManager provides the processor to task assignments 
persisted to the metadata store.
-    * @param taskPartitionAssignmentManager provides the task to partition 
assignments persisted to the metadata store.
-    * @return the instantiated {@see GrouperMetadata}.
-    */
-  def getGrouperMetadata(config: Config, localityManager: LocalityManager, 
taskAssignmentManager: TaskAssignmentManager, taskPartitionAssignmentManager: 
TaskPartitionAssignmentManager) = {
-    val processorLocality: util.Map[String, LocationId] = 
getProcessorLocality(config, localityManager)
-    val taskModes: util.Map[TaskName, TaskMode] = 
taskAssignmentManager.readTaskModes()
-
-    // We read the taskAssignment only for ActiveTasks, i.e., tasks that have 
no task-mode or have an active task mode
-    val taskAssignment: util.Map[String, String] = 
taskAssignmentManager.readTaskAssignment().
-      filterKeys(taskName => !taskModes.containsKey(new TaskName(taskName)) || 
taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
-
-
-    val taskNameToProcessorId: util.Map[TaskName, String] = new 
util.HashMap[TaskName, String]()
-    for ((taskName, processorId) <- taskAssignment) {
-      taskNameToProcessorId.put(new TaskName(taskName), processorId)
-    }
-
-    val taskLocality: util.Map[TaskName, LocationId] = new 
util.HashMap[TaskName, LocationId]()
-    for ((taskName, processorId) <- taskAssignment) {
-      if (processorLocality.containsKey(processorId)) {
-        taskLocality.put(new TaskName(taskName), 
processorLocality.get(processorId))
-      }
-    }
-
-    val sspToTaskMapping: util.Map[SystemStreamPartition, util.List[String]] = 
taskPartitionAssignmentManager.readTaskPartitionAssignments()
-    val taskPartitionAssignments: util.Map[TaskName, 
util.List[SystemStreamPartition]] = new util.HashMap[TaskName, 
util.List[SystemStreamPartition]]()
-
-    // Task to partition assignments is stored as {@see SystemStreamPartition} 
to list of {@see TaskName} in
-    // coordinator stream. This is done due to the 1 MB value size limit in a 
kafka topic. Conversion to
-    // taskName to SystemStreamPartitions is done here to wire-in the data to 
{@see JobModel}.
-    sspToTaskMapping foreach { case (systemStreamPartition: 
SystemStreamPartition, taskNames: util.List[String]) =>
-      for (task <- taskNames) {
-        val taskName: TaskName = new TaskName(task)
-
-        // We read the partition assignments only for active-tasks, i.e., 
tasks that have no task-mode or have an active task mode
-        if (!taskModes.containsKey(taskName) || 
taskModes.get(taskName).eq(TaskMode.Active)) {
-          taskPartitionAssignments.putIfAbsent(taskName, new 
util.ArrayList[SystemStreamPartition]())
-          taskPartitionAssignments.get(taskName).add(systemStreamPartition)
-        }
-      }
-    }
-    new GrouperMetadataImpl(processorLocality, taskLocality, 
taskPartitionAssignments, taskNameToProcessorId)
-  }
-
-  /**
-    * Retrieves and returns the processor locality of a samza job using 
provided {@see Config} and {@see LocalityManager}.
-    * @param config provides the configurations defined by the user. Required 
to connect to the storage layer.
-    * @param localityManager provides the processor to host mapping persisted 
to the metadata store.
-    * @return the processor locality.
-    */
-  def getProcessorLocality(config: Config, localityManager: LocalityManager) = 
{
-    val containerToLocationId: util.Map[String, LocationId] = new 
util.HashMap[String, LocationId]()
-    val existingContainerLocality = 
localityManager.readLocality().getProcessorLocalities
-
-    for (containerId <- 0 until new JobConfig(config).getContainerCount) {
-      val preferredHost = 
Option.apply(existingContainerLocality.get(containerId.toString))
-        .map(containerLocality => containerLocality.host())
-        .filter(host => host.nonEmpty)
-        .orNull
-      // To handle the case when the container count is increased between two 
different runs of a samza-yarn job,
-      // set the locality of newly added containers to any_host.
-      var locationId: LocationId = new LocationId("ANY_HOST")
-      if (preferredHost != null) {
-        locationId = new LocationId(preferredHost)
-      }
-      containerToLocationId.put(containerId.toString, locationId)
-    }
-
-    containerToLocationId
-  }
-
-  /**
-    * This method does the following:
-    * 1. Deletes the existing task assignments if the partition-task grouping 
has changed from the previous run of the job.
-    * 2. Saves the newly generated task assignments to the storage layer 
through the {@param TaskAssignementManager}.
-    *
-    * @param jobModel              represents the {@see JobModel} of the samza 
job.
-    * @param taskAssignmentManager required to persist the processor to task 
assignments to the metadata store.
-    * @param taskPartitionAssignmentManager required to persist the task to 
partition assignments to the metadata store.
-    * @param grouperMetadata       provides the historical metadata of the 
samza application.
-    */
-  def updateTaskAssignments(jobModel: JobModel,
-                            taskAssignmentManager: TaskAssignmentManager,
-                            taskPartitionAssignmentManager: 
TaskPartitionAssignmentManager,
-                            grouperMetadata: GrouperMetadata): Unit = {
-    info("Storing the task assignments into metadata store.")
-    val activeTaskNames: util.Set[String] = new util.HashSet[String]()
-    val standbyTaskNames: util.Set[String] = new util.HashSet[String]()
-    val systemStreamPartitions: util.Set[SystemStreamPartition] = new 
util.HashSet[SystemStreamPartition]()
-    for (container <- jobModel.getContainers.values()) {
-      for (taskModel <- container.getTasks.values()) {
-        if(taskModel.getTaskMode.eq(TaskMode.Active)) {
-          activeTaskNames.add(taskModel.getTaskName.getTaskName)
-        }
-
-        if(taskModel.getTaskMode.eq(TaskMode.Standby)) {
-          standbyTaskNames.add(taskModel.getTaskName.getTaskName)
-        }
-        systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions)
-      }
-    }
-
-    val previousTaskToContainerId = 
grouperMetadata.getPreviousTaskToProcessorAssignment
-    if (activeTaskNames.size() != previousTaskToContainerId.size()) {
-      warn("Current task count %s does not match saved task count %s. Stateful 
jobs may observe misalignment of keys!"
-        format (activeTaskNames.size(), previousTaskToContainerId.size()))
-      // If the tasks changed, then the partition-task grouping is also likely 
changed and we can't handle that
-      // without a much more complicated mapping. Further, the partition count 
may have changed, which means
-      // input message keys are likely reshuffled w.r.t. partitions, so the 
local state may not contain necessary
-      // data associated with the incoming keys. Warn the user and default to 
grouper
-      // In this scenario the tasks may have been reduced, so we need to 
delete all the existing messages
-      
taskAssignmentManager.deleteTaskContainerMappings(previousTaskToContainerId.keys.map(taskName
 => taskName.getTaskName).asJava)
-      taskPartitionAssignmentManager.delete(systemStreamPartitions)
-    }
-
-    // if the set of standby tasks has changed, e.g., when the 
replication-factor changed, or the active-tasks-set has
-    // changed, we log a warning and delete the existing mapping for these 
tasks
-    val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x 
=> x._2.eq(TaskMode.Standby))
-    if(standbyTaskNames.asScala.eq(previousStandbyTasks.keySet)) {
-      info("The set of standby tasks has changed, current standby tasks %s, 
previous standby tasks %s" format (standbyTaskNames, 
previousStandbyTasks.keySet))
-      
taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => 
x._1.getTaskName).asJava)
-    }
-
-    // Task to partition assignments is stored as {@see SystemStreamPartition} 
to list of {@see TaskName} in
-    // coordinator stream. This is done due to the 1 MB value size limit in a 
kafka topic. Conversion to
-    // taskName to SystemStreamPartitions is done here to wire-in the data to 
{@see JobModel}.
-    val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = 
new util.HashMap[SystemStreamPartition, util.List[String]]()
-
-    val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = 
new util.HashMap[String, util.Map[String, TaskMode]]()
-
-    for (container <- jobModel.getContainers.values()) {
-      for ((taskName, taskModel) <- container.getTasks) {
-        taskContainerMappings.putIfAbsent(container.getId, new 
util.HashMap[String, TaskMode]())
-        taskContainerMappings.get(container.getId).put(taskName.getTaskName, 
container.getTasks.get(taskName).getTaskMode)
-        for (partition <- taskModel.getSystemStreamPartitions) {
-          if (!sspToTaskNameMap.containsKey(partition)) {
-            sspToTaskNameMap.put(partition, new util.ArrayList[String]())
-          }
-          sspToTaskNameMap.get(partition).add(taskName.getTaskName)
-        }
-      }
-    }
-
-    taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings)
-    
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
-  }
-
-  /**
-    * Computes the input system stream partitions of a samza job using the 
provided {@param config}
-    * and {@param streamMetadataCache}.
-    * @param config the configuration of the job.
-    * @param streamMetadataCache to query the partition metadata of the input 
streams.
-    * @return the input {@see SystemStreamPartition} of the samza job.
-    */
-  private def getInputStreamPartitions(config: Config, streamMetadataCache: 
StreamMetadataCache): Set[SystemStreamPartition] = {
-
-    val taskConfig = new TaskConfig(config)
-    // Expand regex input, if a regex-rewriter is defined in config
-    val inputSystemStreams =
-      
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
-
-    // Get the set of partitions for each SystemStream from the stream metadata
-    streamMetadataCache
-      .getStreamMetadata(inputSystemStreams, partitionsMetadataOnly = true)
-      .flatMap {
-        case (systemStream, metadata) =>
-          metadata
-            .getSystemStreamPartitionMetadata
-            .asScala
-            .keys
-            .map(new SystemStreamPartition(systemStream, _))
-      }.toSet
-  }
-
-  /**
-    * Builds the input {@see SystemStreamPartition} based upon the {@param 
config} defined by the user.
-    * @param config configuration to fetch the metadata of the input streams.
-    * @param streamMetadataCache required to query the partition metadata of 
the input streams.
-    * @return the input SystemStreamPartitions of the job.
-    */
-  private def getMatchedInputStreamPartitions(config: Config, 
streamMetadataCache: StreamMetadataCache):
-    Set[SystemStreamPartition] = {
-    val allSystemStreamPartitions = getInputStreamPartitions(config, 
streamMetadataCache)
-    val jobConfig = new JobConfig(config)
-    JavaOptionals.toRichOptional(jobConfig.getSSPMatcherClass).toOption match {
-      case Some(sspMatcherClassName) =>
-        val jfr = jobConfig.getSSPMatcherConfigJobFactoryRegex.r
-        
JavaOptionals.toRichOptional(jobConfig.getStreamJobFactoryClass).toOption match 
{
-          case Some(jfr(_*)) =>
-            info("before match: allSystemStreamPartitions.size = %s" format 
allSystemStreamPartitions.size)
-            val sspMatcher = ReflectionUtil.getObj(sspMatcherClassName, 
classOf[SystemStreamPartitionMatcher])
-            val matchedPartitions = 
sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet
-            // Usually a small set hence ok to log at info level
-            info("after match: matchedPartitions = %s" format 
matchedPartitions)
-            matchedPartitions
-          case _ => allSystemStreamPartitions
-        }
-      case _ => allSystemStreamPartitions
-    }
-  }
-
-  /**
-    * Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param 
config}. Instantiates the  {@see SystemStreamPartitionGrouper}
-    * object through the factory.
-    * @param config the configuration of the samza job.
-    * @return the instantiated {@see SystemStreamPartitionGrouper}.
-    */
-  private def getSystemStreamPartitionGrouper(config: Config) = {
-    val factoryString = new 
JobConfig(config).getSystemStreamPartitionGrouperFactory
-    val factory = ReflectionUtil.getObj(factoryString, 
classOf[SystemStreamPartitionGrouperFactory])
-    factory.getSystemStreamPartitionGrouper(config)
-  }
-
-  /**
-   * Refresh Kafka topic list used as input streams if enabled {@link 
org.apache.samza.config.RegExTopicGenerator}
-   * @param config Samza job config
-   * @return refreshed config
-   */
-  private def refreshConfigByRegexTopicRewriter(config: Config): Config = {
-    val jobConfig = new JobConfig(config)
-    JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
-      case Some(rewriters) => rewriters.split(",").
-        filter(rewriterName => 
JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
-          .getOrElse(throw new SamzaException("Unable to find class config for 
config rewriter %s." format rewriterName))
-          .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
-        foldLeft(config)(ConfigUtil.applyRewriter(_, _))
-      case _ => config
-    }
-  }
-
-  /**
-    * Does the following:
-    * 1. Fetches metadata of the input streams defined in configuration 
through {@param streamMetadataCache}.
-    * 2. Applies the {@see SystemStreamPartitionGrouper}, {@see 
TaskNameGrouper} defined in the configuration
-    * to build the {@see JobModel}.
-    * @param originalConfig the configuration of the job.
-    * @param changeLogPartitionMapping the task to changelog partition mapping 
of the job.
-    * @param streamMetadataCache the cache that holds the partition metadata 
of the input streams.
-    * @param grouperMetadata provides the historical metadata of the 
application.
-    * @return the built {@see JobModel}.
-    */
-  def readJobModel(originalConfig: Config,
-                   changeLogPartitionMapping: util.Map[TaskName, Integer],
-                   streamMetadataCache: StreamMetadataCache,
-                   grouperMetadata: GrouperMetadata): JobModel = {
-    // refresh config if enabled regex topic rewriter
-    val config = refreshConfigByRegexTopicRewriter(originalConfig)
-
-    val taskConfig = new TaskConfig(config)
-    // Do grouping to fetch TaskName to SSP mapping
-    val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, 
streamMetadataCache)
-
-    // processor list is required by some of the groupers. So, let's pass them 
as part of the config.
-    // Copy the config and add the processor list to the config copy.
-    // TODO: It is non-ideal to have config as a medium to transmit the 
locality information; especially, if the locality information evolves. Evaluate 
options on using context objects to pass dependent components.
-    val configMap = new util.HashMap[String, String](config)
-    configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", 
grouperMetadata.getProcessorLocality.keySet()))
-    val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
-
-    val jobConfig = new JobConfig(config)
-
-    val groups: util.Map[TaskName, util.Set[SystemStreamPartition]] = if 
(jobConfig.isSSPGrouperProxyEnabled) {
-      val sspGrouperProxy: SSPGrouperProxy =  new SSPGrouperProxy(config, 
grouper)
-      sspGrouperProxy.group(allSystemStreamPartitions, grouperMetadata)
-    } else {
-      warn("SSPGrouperProxy is disabled (%s = false). Stateful jobs may 
produce erroneous results if this is not enabled." format 
JobConfig.SSP_INPUT_EXPANSION_ENABLED)
-      grouper.group(allSystemStreamPartitions)
-    }
-    info("SystemStreamPartitionGrouper %s has grouped the 
SystemStreamPartitions into %d tasks with the following taskNames: %s" 
format(grouper, groups.size(), groups))
-
-    // If no mappings are present(first time the job is running) we return -1, 
this will allow 0 to be the first change
-    // mapping.
-    var maxChangelogPartitionId = 
changeLogPartitionMapping.asScala.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
-    // Sort the groups prior to assigning the changelog mapping so that the 
mapping is reproducible and intuitive
-    val sortedGroups = new util.TreeMap[TaskName, 
util.Set[SystemStreamPartition]](groups)
-
-    // Assign all SystemStreamPartitions to TaskNames.
-    val taskModels = {
-      sortedGroups.asScala.map { case (taskName, systemStreamPartitions) =>
-        val changelogPartition = 
Option(changeLogPartitionMapping.get(taskName)) match {
-          case Some(changelogPartitionId) => new 
Partition(changelogPartitionId)
-          case _ =>
-            // If we've never seen this TaskName before, then assign it a
-            // new changelog.
-            maxChangelogPartitionId += 1
-            info("New task %s is being assigned changelog partition %s." 
format(taskName, maxChangelogPartitionId))
-            new Partition(maxChangelogPartitionId)
-        }
-        new TaskModel(taskName, systemStreamPartitions, changelogPartition)
-      }.toSet
-    }
-
-    // Here is where we should put in a pluggable option for the
-    // SSPTaskNameGrouper for locality, load-balancing, etc.
-    val containerGrouperFactory =
-      ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory, 
classOf[TaskNameGrouperFactory])
-    val standbyTasksEnabled = jobConfig.getStandbyTasksEnabled
-    val standbyTaskReplicationFactor = 
jobConfig.getStandbyTaskReplicationFactor
-    val taskNameGrouperProxy = new 
TaskNameGrouperProxy(containerGrouperFactory.build(config), 
standbyTasksEnabled, standbyTaskReplicationFactor)
-    var containerModels: util.Set[ContainerModel] = null
-    val isHostAffinityEnabled = new 
ClusterManagerConfig(config).getHostAffinityEnabled
-    if(isHostAffinityEnabled) {
-      containerModels = taskNameGrouperProxy.group(taskModels, grouperMetadata)
-    } else {
-      containerModels = taskNameGrouperProxy.group(taskModels, new 
util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
-    }
-
-    val containerMap = containerModels.asScala.map(containerModel => 
containerModel.getId -> containerModel).toMap
-    new JobModel(config, containerMap.asJava)
-  }
 }
 
 /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
new file mode 100644
index 0000000..1210d40
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GroupByContainerCount;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskNameGrouper;
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionMatcher;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import scala.collection.JavaConverters;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unfortunately, we still need powermock for testing the regex topic flow, 
since that flow checks for a specific
+ * rewriter class, and setting up the support for that specific class is too 
cumbersome.
+ * Many of the tests do not need powermock.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConfigUtil.class})
+public class TestJobModelCalculator {
+  private static final String REGEX_REWRITER0 = "regexRewriter0";
+  private static final String REGEX_REWRITER1 = "regexRewriter1";
+  private static final SystemStream SYSTEM_STREAM0 = new 
SystemStream("system0", "stream0");
+  private static final SystemStream SYSTEM_STREAM1 = new 
SystemStream("system1", "stream1");
+
+  @Mock
+  private StreamMetadataCache streamMetadataCache;
+  @Mock
+  private GrouperMetadata grouperMetadata;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    addStreamMetadataCacheMetadata(this.streamMetadataCache,
+        ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4), 
SYSTEM_STREAM1, buildSystemStreamMetadata(3)));
+  }
+
+  @Test
+  public void testBasicSingleStream() {
+    addStreamMetadataCacheMetadata(this.streamMetadataCache,
+        ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4)));
+    Map<TaskName, Integer> changeLogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0), 
ImmutableMap.of());
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0), 
taskName(2), taskModel(2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1), 
taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changeLogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testBasicMultipleStreams() {
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1), 
ImmutableMap.of());
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testCustomSSPGrouper() {
+    // custom grouper only groups into two tasks, so only need 2 changelog 
partitions
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(2);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+        ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY, 
Partition0SeparateFactory.class.getName()));
+    when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+        ImmutableMap.of("0", mock(LocationId.class), "1", 
mock(LocationId.class)));
+    Set<SystemStreamPartition> sspsForTask1 = new 
ImmutableSet.Builder<SystemStreamPartition>().add(
+        new SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)))
+        .add(new SystemStreamPartition(SYSTEM_STREAM0, new Partition(2)))
+        .add(new SystemStreamPartition(SYSTEM_STREAM0, new Partition(3)))
+        .add(new SystemStreamPartition(SYSTEM_STREAM1, new Partition(1)))
+        .add(new SystemStreamPartition(SYSTEM_STREAM1, new Partition(2)))
+        .build();
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+            new ContainerModel("1",
+                ImmutableMap.of(taskName(1), new TaskModel(taskName(1), 
sspsForTask1, new Partition(1)))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testCustomTaskNameGrouper() {
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+        ImmutableMap.of(TaskConfig.GROUPER_FACTORY, 
Task0SeparateFactory.class.getName()));
+    when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+        ImmutableMap.of("0", mock(LocationId.class), "1", 
mock(LocationId.class)));
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+            new ContainerModel("1",
+                ImmutableMap.of(taskName(1), taskModel(1, 1, 1), taskName(2), 
taskModel(2, 2, 2), taskName(3),
+                    taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testWithRegexTopicRewriters() {
+    // this is the SystemStream that is directly in the config
+    SystemStream existingSystemStream = new SystemStream("existingSystem", 
"existingStream");
+    addStreamMetadataCacheMetadata(this.streamMetadataCache,
+        ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4), 
SYSTEM_STREAM1, buildSystemStreamMetadata(3),
+            existingSystemStream, buildSystemStreamMetadata(1)));
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+
+    PowerMockito.mockStatic(ConfigUtil.class);
+    // add SYSTEM_STREAM0 for one rewriter
+    PowerMockito.when(ConfigUtil.applyRewriter(any(), eq(REGEX_REWRITER0)))
+        .thenAnswer(invocation -> addSystemStreamInput(SYSTEM_STREAM0, 
invocation.getArgumentAt(0, Config.class)));
+    // add SYSTEM_STREAM1 for another rewriter
+    PowerMockito.when(ConfigUtil.applyRewriter(any(), eq(REGEX_REWRITER1)))
+        .thenAnswer(invocation -> addSystemStreamInput(SYSTEM_STREAM1, 
invocation.getArgumentAt(0, Config.class)));
+
+    Config config = config(ImmutableList.of(existingSystemStream),
+        ImmutableMap.of(JobConfig.CONFIG_REWRITERS, String.format("%s,%s", 
REGEX_REWRITER0, REGEX_REWRITER1),
+            String.format(JobConfig.CONFIG_REWRITER_CLASS, REGEX_REWRITER0), 
RegExTopicGenerator.class.getName(),
+            String.format(JobConfig.CONFIG_REWRITER_CLASS, REGEX_REWRITER1), 
RegExTopicGenerator.class.getName()));
+    Set<SystemStreamPartition> sspsForTask0 =
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(0)),
+            new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0)),
+            new SystemStreamPartition(existingSystemStream, new Partition(0)));
+    TaskModel taskModel0 = new TaskModel(taskName(0), sspsForTask0, new 
Partition(0));
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel0, 
taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    Map<String, String> expectedConfigMap = new HashMap<>(config);
+    expectedConfigMap.put(TaskConfig.INPUT_STREAMS,
+        String.format("%s,%s,%s", taskInputString(existingSystemStream), 
taskInputString(SYSTEM_STREAM0),
+            taskInputString(SYSTEM_STREAM1)));
+    JobModel expected = new JobModel(new MapConfig(expectedConfigMap), 
containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testWithSSPFilter() {
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+        ImmutableMap.of(JobConfig.SSP_MATCHER_CLASS, 
Partition0Or1Filter.class.getName(),
+            JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, ".*MyJobFactory",
+            // this needs to match the regex in the line above
+            JobConfig.STREAM_JOB_FACTORY_CLASS, 
"org.apache.samza.custom.MyJobFactory"));
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1, 1))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testSSPMatcherConfigJobFactoryRegexNotMatched() {
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+        ImmutableMap.of(JobConfig.SSP_MATCHER_CLASS, 
Partition0Or1Filter.class.getName(),
+            JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, ".*MyJobFactory",
+            // this needs to not match the regex in the line above
+            JobConfig.STREAM_JOB_FACTORY_CLASS, 
"org.apache.samza.custom.OtherJobFactory"));
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testNoPreviousTasksAssignsNewChangelogPartitions() {
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1), 
ImmutableMap.of());
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual = JobModelCalculator.INSTANCE.calculateJobModel(config, 
ImmutableMap.of(), this.streamMetadataCache,
+        this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testPreviousChangelogPartitionsMaintained() {
+    // existing changelog mapping has 2 tasks, but the job model ultimately 
will need 4 tasks
+    // intentionally using an "out-of-order" changelog mapping to make sure it 
gets maintained
+    Map<TaskName, Integer> changelogPartitionMapping = 
ImmutableMap.of(taskName(0), 1, taskName(1), 0);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1), 
ImmutableMap.of());
+    // these task models have special changelog partitions from the previous 
mapping
+    TaskModel taskModel0 = new TaskModel(taskName(0),
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(0)),
+            new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))), new 
Partition(1));
+    TaskModel taskModel1 = new TaskModel(taskName(1),
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(1)),
+            new SystemStreamPartition(SYSTEM_STREAM1, new Partition(1))), new 
Partition(0));
+    // tasks 2 and 3 will get assigned new changelog partitions
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel0, 
taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1, 
taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testSSPGrouperProxyUsed() {
+    addStreamMetadataCacheMetadata(this.streamMetadataCache,
+        ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4)));
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(2);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0),
+        ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY, 
Partition0SeparateFactory.class.getName(),
+            // need this to trigger SSPGrouperProxy logic
+            String.format(StorageConfig.FACTORY, "myStore"), "MyCustomStore"));
+    // custom SSP grouper expects a certain processor locality for another 
test, so add the locality here too
+    when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+        ImmutableMap.of("0", mock(LocationId.class), "1", 
mock(LocationId.class)));
+    /*
+     * Even though the custom grouper factory would normally send the 
additional SSPs to task 1, the SSP grouper proxy
+     * should give task 0 some of the SSPs.
+     */
+    when(this.grouperMetadata.getPreviousTaskToSSPAssignment()).thenReturn(
+        ImmutableMap.of(taskName(0), ImmutableList.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0))),
+            taskName(1), ImmutableList.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)))));
+    Set<SystemStreamPartition> sspsForTask0 =
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(0)),
+            new SystemStreamPartition(SYSTEM_STREAM0, new Partition(2)));
+    Set<SystemStreamPartition> sspsForTask1 =
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(1)),
+            new SystemStreamPartition(SYSTEM_STREAM0, new Partition(3)));
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0", new 
ContainerModel("0",
+            ImmutableMap.of(taskName(0), new TaskModel(taskName(0), 
sspsForTask0, new Partition(0)))), "1",
+        new ContainerModel("1",
+            ImmutableMap.of(taskName(1), new TaskModel(taskName(1), 
sspsForTask1, new Partition(1)))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testHostAffinityEnabled() {
+    Map<TaskName, Integer> changelogPartitionMapping = 
changelogPartitionMapping(4);
+    Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+        ImmutableMap.of(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true",
+            // make sure the group method which accepts GrouperMetadata is used
+            TaskConfig.GROUPER_FACTORY, 
GroupByContainerCountOverrideFactory.class.getName()));
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    JobModel expected = new JobModel(config, containerModels);
+    JobModel actual =
+        JobModelCalculator.INSTANCE.calculateJobModel(config, 
changelogPartitionMapping, this.streamMetadataCache,
+            this.grouperMetadata);
+    assertEquals(expected, actual);
+  }
+
+  private static SystemStreamMetadata buildSystemStreamMetadata(int 
numPartitions) {
+    SystemStreamMetadata systemStreamMetadata = 
mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionToMetadata =
+        IntStream.range(0, numPartitions)
+            .boxed()
+            .collect(
+                Collectors.toMap(Partition::new, i -> 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)));
+    
when(systemStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(partitionToMetadata);
+    return systemStreamMetadata;
+  }
+
+  private static TaskName taskName(int id) {
+    return new TaskName("Partition " + id);
+  }
+
+  private static TaskModel taskModel(int id, int partitionForSystemStream0) {
+    return new TaskModel(new TaskName("Partition " + id),
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(partitionForSystemStream0))),
+        new Partition(id));
+  }
+
+  private static TaskModel taskModel(int id, int partitionForSystemStream0, 
int partitionForSystemStream1) {
+    return new TaskModel(taskName(id),
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(partitionForSystemStream0)),
+            new SystemStreamPartition(SYSTEM_STREAM1, new 
Partition(partitionForSystemStream1))), new Partition(id));
+  }
+
+  private static Map<TaskName, Integer> changelogPartitionMapping(int 
numPartitions) {
+    return IntStream.range(0, numPartitions)
+        .boxed()
+        .collect(Collectors.toMap(TestJobModelCalculator::taskName, 
Function.identity()));
+  }
+
+  private static Config config(List<SystemStream> inputs, Map<String, String> 
extraConfigs) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_CONTAINER_COUNT, "2");
+    configMap.put(TaskConfig.INPUT_STREAMS,
+        
inputs.stream().map(TestJobModelCalculator::taskInputString).collect(Collectors.joining(",")));
+    configMap.putAll(extraConfigs);
+    return new MapConfig(configMap);
+  }
+
+  private static String taskInputString(SystemStream systemStream) {
+    return String.format("%s.%s", systemStream.getSystem(), 
systemStream.getStream());
+  }
+
+  private static void addStreamMetadataCacheMetadata(StreamMetadataCache 
mockStreamMetadataCache,
+      Map<SystemStream, SystemStreamMetadata> systemStreamMetadataMap) {
+    scala.collection.immutable.Set<SystemStream> systemStreams =
+        
JavaConverters.asScalaSetConverter(systemStreamMetadataMap.keySet()).asScala().toSet();
+    scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
scalaSystemStreamMetadataMap =
+        ScalaJavaUtil.toScalaMap(systemStreamMetadataMap);
+    when(mockStreamMetadataCache.getStreamMetadata(systemStreams, 
true)).thenReturn(scalaSystemStreamMetadataMap);
+  }
+
+  public static class Partition0SeparateFactory implements 
SystemStreamPartitionGrouperFactory {
+    @Override
+    public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config 
config) {
+      // check that the "processor.list" gets passed through the config
+      assertEquals(ImmutableSet.of("0", "1"), 
ImmutableSet.copyOf(config.get(JobConfig.PROCESSOR_LIST).split(",")));
+      return new Partition0Separate();
+    }
+  }
+
+  /**
+   * Groups all SSPs into two tasks. The first task gets all of the SSPs with 
a partition id of 0. The second task gets
+   * all other SSPs. This is used for testing a custom {@link 
SystemStreamPartitionGrouper}.
+   */
+  private static class Partition0Separate implements 
SystemStreamPartitionGrouper {
+    @Override
+    public Map<TaskName, Set<SystemStreamPartition>> 
group(Set<SystemStreamPartition> systemStreamPartitions) {
+      // if partition 0, then add to this first set
+      Set<SystemStreamPartition> sspsForTask0 = new HashSet<>();
+      // all SSPs that are not partition 0 go into this second set
+      Set<SystemStreamPartition> sspsForTask1 = new HashSet<>();
+      systemStreamPartitions.forEach(ssp -> {
+        if (ssp.getPartition().getPartitionId() == 0) {
+          sspsForTask0.add(ssp);
+        } else {
+          sspsForTask1.add(ssp);
+        }
+      });
+      return ImmutableMap.of(taskName(0), sspsForTask0, taskName(1), 
sspsForTask1);
+    }
+  }
+
+  public static class Task0SeparateFactory implements TaskNameGrouperFactory {
+    @Override
+    public TaskNameGrouper build(Config config) {
+      return new Task0Separate();
+    }
+  }
+
+  private static class Task0Separate implements TaskNameGrouper {
+    @Override
+    public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> 
containerIds) {
+      // if task 0, then add to this map
+      Map<TaskName, TaskModel> tasksForContainer0 = new HashMap<>();
+      // all tasks that aren't task 0 go into this map
+      Map<TaskName, TaskModel> tasksForContainer1 = new HashMap<>();
+      taskModels.forEach(taskModel -> {
+        if (taskName(0).equals(taskModel.getTaskName())) {
+          tasksForContainer0.put(taskName(0), taskModel);
+        } else {
+          tasksForContainer1.put(taskModel.getTaskName(), taskModel);
+        }
+      });
+      return ImmutableSet.of(new ContainerModel("0", tasksForContainer0), new 
ContainerModel("1", tasksForContainer1));
+    }
+  }
+
+  public static class Partition0Or1Filter implements 
SystemStreamPartitionMatcher {
+    @Override
+    public Set<SystemStreamPartition> filter(Set<SystemStreamPartition> 
systemStreamPartitions, Config config) {
+      return systemStreamPartitions.stream()
+          .filter(ssp -> ssp.getPartition().getPartitionId() <= 1)
+          .collect(Collectors.toSet());
+    }
+  }
+
+  public static class GroupByContainerCountOverrideFactory implements 
TaskNameGrouperFactory {
+    @Override
+    public TaskNameGrouper build(Config config) {
+      return new GroupByContainerCountOverride(new 
JobConfig(config).getContainerCount());
+    }
+  }
+
+  private static class GroupByContainerCountOverride extends 
GroupByContainerCount {
+    public GroupByContainerCountOverride(int containerCount) {
+      super(containerCount);
+    }
+
+    @Override
+    public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> 
containersIds) {
+      throw new UnsupportedOperationException("This should not be called");
+    }
+  }
+
+  private static Config addSystemStreamInput(SystemStream systemStream, Config 
config) {
+    String existingInputs = config.get(TaskConfig.INPUT_STREAMS);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(existingInputs));
+    Map<String, String> newConfig = new HashMap<>(config);
+    newConfig.put(TaskConfig.INPUT_STREAMS, String.format("%s,%s", 
existingInputs, taskInputString(systemStream)));
+    return new MapConfig(newConfig);
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java
new file mode 100644
index 0000000..985561b
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestJobModelHelper {
+  private static final SystemStream SYSTEM_STREAM0 = new 
SystemStream("system0", "stream0");
+  private static final SystemStream SYSTEM_STREAM1 = new 
SystemStream("system1", "stream1");
+  private static final int NUM_CONTAINERS = 2;
+  private static final String HOST0 = "host-0.abc";
+  private static final String HOST1 = "host-1.abc";
+
+  @Mock
+  private LocalityManager localityManager;
+  @Mock
+  private LocalityModel localityModel;
+  @Mock
+  private TaskAssignmentManager taskAssignmentManager;
+  @Mock
+  private TaskPartitionAssignmentManager taskPartitionAssignmentManager;
+  @Mock
+  private StreamMetadataCache streamMetadataCache;
+  @Mock
+  private JobModelCalculator jobModelCalculator;
+  @Mock
+  private Map<TaskName, Integer> changelogPartitionMapping;
+  @Captor
+  private ArgumentCaptor<GrouperMetadata> grouperMetadataArgumentCaptor;
+
+  private JobModelHelper jobModelHelper;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(this.localityManager.readLocality()).thenReturn(this.localityModel);
+    this.jobModelHelper =
+        new JobModelHelper(this.localityManager, this.taskAssignmentManager, 
this.taskPartitionAssignmentManager,
+            this.streamMetadataCache, this.jobModelCalculator);
+  }
+
+  @Test
+  public void testNoPreviousJob() {
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), ImmutableMap.of(), 
ImmutableMap.of(), ImmutableMap.of());
+    verifyNewTaskAssignments(ImmutableSet.of(), allSSPs(containerModels), 
containerToTaskToMode(containerModels),
+        sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testSameJobModelAsPrevious() {
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    setupOldTaskAssignments(containerModels);
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(4),
+        activeTaskToSSPs(containerModels), 
activeTaskToContainer(containerModels));
+    verifyNewTaskAssignments(null, null, 
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testNewContainerWithProcessorLocality() {
+    Map<String, ContainerModel> oldContainerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))));
+    Map<String, ProcessorLocality> processorLocalities = ImmutableMap.of("0", 
processorLocality("0", HOST0));
+    
when(this.localityModel.getProcessorLocalities()).thenReturn(processorLocalities);
+    setupOldTaskAssignments(oldContainerModels);
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(processorIdToLocationId(ImmutableMap.of("0", HOST0)),
+        ImmutableMap.of(taskName(0), new LocationId(HOST0)), 
activeTaskToSSPs(oldContainerModels),
+        activeTaskToContainer(oldContainerModels));
+    verifyNewTaskAssignments(taskNameStrings(oldContainerModels, 
TaskMode.Active), allSSPs(containerModels),
+        containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testAllProcessorLocalityExists() {
+    Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0, 
0), taskName(2), taskModel(2, 2, 2))), "1",
+        new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1, 
1), taskName(3), taskModel(3, 3))));
+    Map<String, ProcessorLocality> processorLocalities =
+        ImmutableMap.of("0", processorLocality("0", HOST0), "1", 
processorLocality("1", HOST1));
+    
when(this.localityModel.getProcessorLocalities()).thenReturn(processorLocalities);
+    setupOldTaskAssignments(containerModels);
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(processorIdToLocationId(ImmutableMap.of("0", HOST0, 
"1", HOST1)),
+        ImmutableMap.of(taskName(0), new LocationId(HOST0), taskName(1), new 
LocationId(HOST1), taskName(2),
+            new LocationId(HOST0), taskName(3), new LocationId(HOST1)), 
activeTaskToSSPs(containerModels),
+        activeTaskToContainer(containerModels));
+    verifyNewTaskAssignments(null, null, 
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testAddBroadcastInput() {
+    Map<String, ContainerModel> oldContainerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1))));
+    setupOldTaskAssignments(oldContainerModels);
+    TaskModel taskModel0 = taskModel(0, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0)),
+        new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+    TaskModel taskModel1 = taskModel(1, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)),
+        new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel0)), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1)));
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(2),
+        activeTaskToSSPs(oldContainerModels), 
activeTaskToContainer(oldContainerModels));
+    verifyNewTaskAssignments(null, null, 
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testExistingBroadcastInput() {
+    TaskModel taskModel0 = taskModel(0, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0)),
+        new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+    TaskModel taskModel1 = taskModel(1, ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)),
+        new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel0)), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1)));
+    setupOldTaskAssignments(containerModels);
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(2),
+        activeTaskToSSPs(containerModels), 
activeTaskToContainer(containerModels));
+    verifyNewTaskAssignments(null, null, 
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testNewStandbyContainers() {
+    Map<String, ContainerModel> oldContainerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1))));
+    setupOldTaskAssignments(oldContainerModels);
+    Map<String, ContainerModel> containerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1))), "0-0",
+            new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0), 
standbyTaskModel(0, 0))), "1-0",
+            new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0), 
standbyTaskModel(1, 0))));
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(2),
+        activeTaskToSSPs(oldContainerModels), 
activeTaskToContainer(oldContainerModels));
+    // TaskAssignmentManager.deleteTaskContainerMappings is called once due to 
change in standby task count
+    verifyNewTaskAssignments(ImmutableSet.of(), null, 
containerToTaskToMode(containerModels),
+        sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testExistingStandbyContainers() {
+    Map<String, ContainerModel> oldContainerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1))), "0-0",
+            new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0), 
standbyTaskModel(0, 0))), "1-0",
+            new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0), 
standbyTaskModel(1, 0))));
+    setupOldTaskAssignments(oldContainerModels);
+    Map<String, ContainerModel> containerModels = new 
ImmutableMap.Builder<String, ContainerModel>().put("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0))))
+        .put("1", new ContainerModel("1", ImmutableMap.of(taskName(1), 
taskModel(1, 1))))
+        .put("0-0", new ContainerModel("0-0", 
ImmutableMap.of(standbyTaskName(0, 0), standbyTaskModel(0, 0))))
+        .put("1-0", new ContainerModel("1-0", 
ImmutableMap.of(standbyTaskName(1, 0), standbyTaskModel(1, 0))))
+        .put("0-1", new ContainerModel("0-1", 
ImmutableMap.of(standbyTaskName(0, 1), standbyTaskModel(0, 1))))
+        .put("1-1", new ContainerModel("1-1", 
ImmutableMap.of(standbyTaskName(1, 1), standbyTaskModel(1, 1))))
+        .build();
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(2),
+        activeTaskToSSPs(oldContainerModels), 
activeTaskToContainer(oldContainerModels));
+    verifyNewTaskAssignments(taskNameStrings(oldContainerModels, 
TaskMode.Standby), null,
+        containerToTaskToMode(containerModels), sspToTasks(containerModels));
+  }
+
+  @Test
+  public void testNewStandbyTasks() {
+    Map<String, ContainerModel> oldContainerModels =
+        ImmutableMap.of("0", new ContainerModel("0", 
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+            new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 
1))), "0-0",
+            new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0), 
standbyTaskModel(0, 0))), "1-0",
+            new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0), 
standbyTaskModel(1, 0))));
+    setupOldTaskAssignments(oldContainerModels);
+    Map<String, ContainerModel> containerModels = new 
ImmutableMap.Builder<String, ContainerModel>().put("0",
+        new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0))))
+        .put("1", new ContainerModel("1", ImmutableMap.of(taskName(1), 
taskModel(1, 1))))
+        .put("2", new ContainerModel("2", ImmutableMap.of(taskName(2), 
taskModel(2, 2))))
+        .put("0-0", new ContainerModel("0-0", 
ImmutableMap.of(standbyTaskName(0, 0), standbyTaskModel(0, 0))))
+        .put("1-0", new ContainerModel("1-0", 
ImmutableMap.of(standbyTaskName(1, 0), standbyTaskModel(1, 0))))
+        .put("2-0", new ContainerModel("2-0", 
ImmutableMap.of(standbyTaskName(2, 0), standbyTaskModel(2, 0))))
+        .build();
+    runAndCheckNewJobModel(config(), containerModels);
+    verifyGrouperMetadata(anyHostProcessorIdToLocationId(), 
anyHostTaskNameToLocationId(2),
+        activeTaskToSSPs(oldContainerModels), 
activeTaskToContainer(oldContainerModels));
+    //noinspection unchecked: ArgumentCaptor doesn't ideally handle multiple 
levels of generics, so need cast
+    ArgumentCaptor<Iterable<String>> deleteTaskContainerMappingsCaptor =
+        (ArgumentCaptor<Iterable<String>>) (ArgumentCaptor<?>) 
ArgumentCaptor.forClass(Iterable.class);
+    verify(this.taskAssignmentManager, times(2)).deleteTaskContainerMappings(
+        deleteTaskContainerMappingsCaptor.capture());
+    assertEquals(taskNameStrings(oldContainerModels, TaskMode.Active),
+        
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getAllValues().get(0)));
+    assertEquals(taskNameStrings(oldContainerModels, TaskMode.Standby),
+        
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getAllValues().get(1)));
+    
verify(this.taskPartitionAssignmentManager).delete(allSSPs(containerModels));
+    verify(this.taskPartitionAssignmentManager).delete(any());
+    
verify(this.taskAssignmentManager).writeTaskContainerMappings(containerToTaskToMode(containerModels));
+    verify(this.taskAssignmentManager).writeTaskContainerMappings(any());
+    
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(sspToTasks(containerModels));
+    
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(any());
+  }
+
+  private void runAndCheckNewJobModel(Config config, Map<String, 
ContainerModel> containerModels) {
+    JobModel jobModel = new JobModel(config, containerModels);
+    when(this.jobModelCalculator.calculateJobModel(eq(config), 
eq(this.changelogPartitionMapping),
+        eq(this.streamMetadataCache), 
this.grouperMetadataArgumentCaptor.capture())).thenReturn(jobModel);
+    JobModel actualJobModel = this.jobModelHelper.newJobModel(config, 
this.changelogPartitionMapping);
+    assertEquals(jobModel, actualJobModel);
+  }
+
+  private void setupOldTaskAssignments(Map<String, ContainerModel> 
oldContainerModels) {
+    
when(this.taskAssignmentManager.readTaskAssignment()).thenReturn(taskNameStringToContainer(oldContainerModels));
+    
when(this.taskAssignmentManager.readTaskModes()).thenReturn(taskNameToMode(oldContainerModels));
+    
when(this.taskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(sspToTasks(oldContainerModels));
+  }
+
+  private static Config config() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_CONTAINER_COUNT, 
Integer.toString(NUM_CONTAINERS));
+    return new MapConfig(configMap);
+  }
+
+  private static TaskName taskName(int id) {
+    return new TaskName("Partition " + id);
+  }
+
+  private static TaskModel taskModel(int id, int partitionForSystemStream0) {
+    return taskModel(id,
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(partitionForSystemStream0))));
+  }
+
+  private static TaskModel taskModel(int id, int partitionForSystemStream0, 
int partitionForSystemStream1) {
+    return taskModel(id,
+        ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new 
Partition(partitionForSystemStream0)),
+            new SystemStreamPartition(SYSTEM_STREAM1, new 
Partition(partitionForSystemStream1))));
+  }
+
+  private static TaskModel taskModel(int id, Set<SystemStreamPartition> 
systemStreamPartitions) {
+    return new TaskModel(taskName(id), systemStreamPartitions, new 
Partition(id));
+  }
+
+  private static TaskName standbyTaskName(int taskId, int standbyId) {
+    return new TaskName(String.format("Standby-Partition %s-%s", taskId, 
standbyId));
+  }
+
+  private static TaskModel standbyTaskModel(int taskId, int standbyId) {
+    TaskName taskName = new TaskName(String.format("Standby-Partition %s-%s", 
taskId, standbyId));
+    Set<SystemStreamPartition> ssps = ImmutableSet.of(new 
SystemStreamPartition(SYSTEM_STREAM0, new Partition(taskId)));
+    return new TaskModel(taskName, ssps, new Partition(taskId), 
TaskMode.Standby);
+  }
+
+  private static Set<SystemStreamPartition> allSSPs(Map<String, 
ContainerModel> containerModels) {
+    return taskModelStream(containerModels).flatMap(taskModel -> 
taskModel.getSystemStreamPartitions().stream())
+        .collect(Collectors.toSet());
+  }
+
+  private static Map<String, Map<String, TaskMode>> 
containerToTaskToMode(Map<String, ContainerModel> containerModels) {
+    return containerModels.entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()
+            .getTasks()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(taskModelEntry -> 
taskModelEntry.getKey().getTaskName(),
+              taskModelEntry -> taskModelEntry.getValue().getTaskMode()))));
+  }
+
+  private static Stream<TaskModel> taskModelStream(Map<String, ContainerModel> 
containerModels) {
+    return containerModels.values().stream().flatMap(containerModel -> 
containerModel.getTasks().values().stream());
+  }
+
+  private static Set<TaskName> taskNames(Map<String, ContainerModel> 
containerModels, TaskMode taskMode) {
+    return taskModelStream(containerModels).filter(taskModel -> 
taskMode.equals(taskModel.getTaskMode()))
+        .map(TaskModel::getTaskName)
+        .collect(Collectors.toSet());
+  }
+
+  private static Set<String> taskNameStrings(Map<String, ContainerModel> 
containerModels, TaskMode taskMode) {
+    return taskNames(containerModels, 
taskMode).stream().map(TaskName::getTaskName).collect(Collectors.toSet());
+  }
+
+  private static Map<TaskName, List<SystemStreamPartition>> activeTaskToSSPs(
+      Map<String, ContainerModel> containerModels) {
+    return taskModelStream(containerModels).filter(taskModel -> 
TaskMode.Active.equals(taskModel.getTaskMode()))
+        .collect(Collectors.toMap(TaskModel::getTaskName,
+          taskModel -> 
ImmutableList.copyOf(taskModel.getSystemStreamPartitions())));
+  }
+
+  private static Map<TaskName, TaskMode> taskNameToMode(Map<String, 
ContainerModel> containerModels) {
+    return 
taskModelStream(containerModels).collect(Collectors.toMap(TaskModel::getTaskName,
 TaskModel::getTaskMode));
+  }
+
+  private static Map<TaskName, String> activeTaskToContainer(Map<String, 
ContainerModel> containerModels) {
+    Map<TaskName, String> taskToContainerMap = new HashMap<>();
+    containerModels.values()
+        .forEach(containerModel -> containerModel.getTasks()
+            .values()
+            .stream()
+            .filter(taskModel -> 
TaskMode.Active.equals(taskModel.getTaskMode()))
+            .forEach(taskModel -> 
taskToContainerMap.put(taskModel.getTaskName(), containerModel.getId())));
+    return taskToContainerMap;
+  }
+
+  private static Map<String, String> taskNameStringToContainer(Map<String, 
ContainerModel> containerModels) {
+    return activeTaskToContainer(containerModels).entrySet()
+        .stream()
+        .collect(Collectors.toMap(entry -> entry.getKey().getTaskName(), 
Map.Entry::getValue));
+  }
+
+  private static Map<SystemStreamPartition, List<String>> 
sspToTasks(Map<String, ContainerModel> containerModels) {
+    Map<SystemStreamPartition, List<String>> sspToTasksMap = new HashMap<>();
+    taskModelStream(containerModels).forEach(taskModel -> 
taskModel.getSystemStreamPartitions().forEach(ssp -> {
+      sspToTasksMap.putIfAbsent(ssp, new ArrayList<>());
+      sspToTasksMap.get(ssp).add(taskModel.getTaskName().getTaskName());
+    }));
+    return sspToTasksMap;
+  }
+
+  private static Map<String, LocationId> processorIdToLocationId(Map<String, 
String> processorIdToHostOverrides) {
+    Map<String, LocationId> processorLocality = new 
HashMap<>(anyHostProcessorIdToLocationId());
+    processorIdToHostOverrides.forEach((processorId, host) -> 
processorLocality.put(processorId, new LocationId(host)));
+    return processorLocality;
+  }
+
+  private static Map<String, LocationId> anyHostProcessorIdToLocationId() {
+    return IntStream.range(0, NUM_CONTAINERS)
+        .boxed()
+        .collect(Collectors.toMap(i -> Integer.toString(i), i -> new 
LocationId("ANY_HOST")));
+  }
+
+  private static Map<TaskName, LocationId> anyHostTaskNameToLocationId(int 
numTasks) {
+    return IntStream.range(0, numTasks)
+        .boxed()
+        .collect(Collectors.toMap(TestJobModelHelper::taskName, i -> new 
LocationId("ANY_HOST")));
+  }
+
+  private void verifyGrouperMetadata(Map<String, LocationId> 
processorLocality, Map<TaskName, LocationId> taskLocality,
+      Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignment,
+      Map<TaskName, String> previousTaskToProcessorAssignment) {
+    GrouperMetadata grouperMetadata = 
this.grouperMetadataArgumentCaptor.getValue();
+    assertEquals(processorLocality, grouperMetadata.getProcessorLocality());
+    assertEquals(taskLocality, grouperMetadata.getTaskLocality());
+    assertPreviousTaskToSSPAssignment(previousTaskToSSPAssignment, 
grouperMetadata.getPreviousTaskToSSPAssignment());
+    assertEquals(previousTaskToProcessorAssignment, 
grouperMetadata.getPreviousTaskToProcessorAssignment());
+  }
+
+  private void verifyNewTaskAssignments(Set<String> 
deleteTaskContainerMappings,
+      Set<SystemStreamPartition> taskPartitionAssignmentDeletes,
+      Map<String, Map<String, TaskMode>> writeTaskContainerMappings,
+      Map<SystemStreamPartition, List<String>> writeTaskPartitionAssignments) {
+    if (deleteTaskContainerMappings != null) {
+      //noinspection unchecked: ArgumentCaptor doesn't ideally handle multiple 
levels of generics, so need cast
+      ArgumentCaptor<Iterable<String>> deleteTaskContainerMappingsCaptor =
+          (ArgumentCaptor<Iterable<String>>) (ArgumentCaptor<?>) 
ArgumentCaptor.forClass(Iterable.class);
+      
verify(this.taskAssignmentManager).deleteTaskContainerMappings(deleteTaskContainerMappingsCaptor.capture());
+      // order of Iterable argument shouldn't matter
+      assertEquals(deleteTaskContainerMappings, 
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getValue()));
+    } else {
+      verify(this.taskAssignmentManager, 
never()).deleteTaskContainerMappings(any());
+    }
+    if (taskPartitionAssignmentDeletes != null) {
+      
verify(this.taskPartitionAssignmentManager).delete(taskPartitionAssignmentDeletes);
+      verify(this.taskPartitionAssignmentManager).delete(any());
+    } else {
+      verify(this.taskPartitionAssignmentManager, never()).delete(any());
+    }
+
+    
verify(this.taskAssignmentManager).writeTaskContainerMappings(writeTaskContainerMappings);
+    // verifies no calls to writeTaskContainerMappings other than the one above
+    verify(this.taskAssignmentManager).writeTaskContainerMappings(any());
+
+    
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(writeTaskPartitionAssignments);
+    // verifies no calls to writeTaskPartitionAssignments other than the one 
above
+    
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(any());
+  }
+
+  /**
+   * Order of SSPs in values of the map shouldn't matter, and the order is 
also non-deterministic, so just check the
+   * contents of the lists.
+   */
+  private static void assertPreviousTaskToSSPAssignment(Map<TaskName, 
List<SystemStreamPartition>> expected,
+      Map<TaskName, List<SystemStreamPartition>> actual) {
+    assertEquals(expected.size(), actual.size());
+    expected.forEach((taskName, sspList) -> 
assertContentsEqualNoDuplicates(sspList, actual.get(taskName)));
+  }
+
+  private static <T> void assertContentsEqualNoDuplicates(List<T> expected, 
List<T> actual) {
+    Set<T> expectedSet = ImmutableSet.copyOf(expected);
+    Set<T> actualSet = ImmutableSet.copyOf(actual);
+    assertEquals(expectedSet.size(), expected.size());
+    assertEquals(actualSet.size(), actual.size());
+    assertEquals(expectedSet, actualSet);
+  }
+
+  private static ProcessorLocality processorLocality(String id, String host) {
+    ProcessorLocality processorLocality = mock(ProcessorLocality.class);
+    when(processorLocality.id()).thenReturn(id);
+    when(processorLocality.host()).thenReturn(host);
+    return processorLocality;
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
deleted file mode 100644
index 83de0cf..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.*;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.RegExTopicGenerator;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.container.grouper.task.GroupByContainerCount;
-import org.apache.samza.container.grouper.task.GrouperMetadata;
-import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
-import org.apache.samza.container.grouper.task.TaskAssignmentManager;
-import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.ProcessorLocality;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.LocalityModel;
-import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.runtime.LocationId;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.testUtils.MockHttpServer;
-import org.apache.samza.util.ConfigUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import scala.collection.JavaConversions;
-
-/**
- * Unit tests for {@link JobModelManager}
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class, 
ConfigUtil.class})
-public class TestJobModelManager {
-  private final TaskAssignmentManager mockTaskManager = 
mock(TaskAssignmentManager.class);
-  private final Map<String, Map<String, String>> localityMappings = new 
HashMap<>();
-  private final HttpServer server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
-  private final SystemStream inputStream = new SystemStream("test-system", 
"test-stream");
-  private final SystemStreamMetadata.SystemStreamPartitionMetadata 
mockSspMetadata = 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class);
-  private final Map<Partition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> mockSspMetadataMap = 
Collections.singletonMap(new Partition(0), mockSspMetadata);
-  private final SystemStreamMetadata mockStreamMetadata = 
mock(SystemStreamMetadata.class);
-  private final scala.collection.immutable.Map<SystemStream, 
SystemStreamMetadata> mockStreamMetadataMap = new 
scala.collection.immutable.Map.Map1(inputStream, mockStreamMetadata);
-  private final StreamMetadataCache mockStreamMetadataCache = 
mock(StreamMetadataCache.class);
-  private final scala.collection.immutable.Set<SystemStream> inputStreamSet = 
JavaConversions.asScalaSet(Collections.singleton(inputStream)).toSet();
-
-  private JobModelManager jobModelManager;
-
-  @Before
-  public void setup() throws Exception {
-    when(mockStreamMetadataCache.getStreamMetadata(argThat(new 
ArgumentMatcher<scala.collection.immutable.Set<SystemStream>>() {
-      @Override
-      public boolean matches(Object argument) {
-        scala.collection.immutable.Set<SystemStream> set = 
(scala.collection.immutable.Set<SystemStream>) argument;
-        return set.equals(inputStreamSet);
-      }
-    }), anyBoolean())).thenReturn(mockStreamMetadataMap);
-    
when(mockStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(mockSspMetadataMap);
-    
PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(mockTaskManager);
-    
when(mockTaskManager.readTaskAssignment()).thenReturn(Collections.EMPTY_MAP);
-  }
-
-  @Test
-  public void testGetGrouperMetadata() {
-    // Mocking setup.
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    TaskAssignmentManager mockTaskAssignmentManager = 
Mockito.mock(TaskAssignmentManager.class);
-    TaskPartitionAssignmentManager mockTaskPartitionAssignmentManager = 
Mockito.mock(TaskPartitionAssignmentManager.class);
-
-    SystemStreamPartition testSystemStreamPartition1 = new 
SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new 
Partition(1));
-    SystemStreamPartition testSystemStreamPartition2 = new 
SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new 
Partition(2));
-
-    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", 
"abc-affinity"))));
-
-    Map<SystemStreamPartition, List<String>> taskToSSPAssignments = 
ImmutableMap.of(testSystemStreamPartition1, ImmutableList.of("task-0", 
"task-1"),
-                                                                               
     testSystemStreamPartition2, ImmutableList.of("task-2", "task-3"));
-
-    Map<String, String> taskAssignment = ImmutableMap.of("task-0", "0");
-
-    // Mock the task to partition assignment.
-    
when(mockTaskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(taskToSSPAssignments);
-
-    // Mock the container to task assignment.
-    
when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
-    
when(mockTaskAssignmentManager.readTaskModes()).thenReturn(ImmutableMap.of(new 
TaskName("task-0"), TaskMode.Active, new TaskName("task-1"), TaskMode.Active, 
new TaskName("task-2"), TaskMode.Active, new TaskName("task-3"), 
TaskMode.Active));
-
-    GrouperMetadataImpl grouperMetadata = 
JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, 
mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);
-
-    verify(mockLocalityManager).readLocality();
-    verify(mockTaskAssignmentManager).readTaskAssignment();
-
-    Assert.assertEquals(ImmutableMap.of("0", new LocationId("abc-affinity")), 
grouperMetadata.getProcessorLocality());
-    Assert.assertEquals(ImmutableMap.of(new TaskName("task-0"), new 
LocationId("abc-affinity")), grouperMetadata.getTaskLocality());
-
-    Map<TaskName, List<SystemStreamPartition>> expectedTaskToSSPAssignments = 
ImmutableMap.of(new TaskName("task-0"), 
ImmutableList.of(testSystemStreamPartition1),
-                                                                               
               new TaskName("task-1"), 
ImmutableList.of(testSystemStreamPartition1),
-                                                                               
               new TaskName("task-2"), 
ImmutableList.of(testSystemStreamPartition2),
-                                                                               
               new TaskName("task-3"), 
ImmutableList.of(testSystemStreamPartition2));
-    Assert.assertEquals(expectedTaskToSSPAssignments, 
grouperMetadata.getPreviousTaskToSSPAssignment());
-  }
-
-  @Test
-  public void testGetProcessorLocalityAllEntriesExisting() {
-    Config config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
-
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of(
-        "0", new ProcessorLocality("0", "0-affinity"),
-        "1", new ProcessorLocality("1", "1-affinity"))));
-
-    Map<String, LocationId> processorLocality = 
JobModelManager.getProcessorLocality(config, mockLocalityManager);
-
-    verify(mockLocalityManager).readLocality();
-    ImmutableMap<String, LocationId> expected =
-        ImmutableMap.of("0", new LocationId("0-affinity"), "1", new 
LocationId("1-affinity"));
-    Assert.assertEquals(expected, processorLocality);
-  }
-
-  @Test
-  public void testGetProcessorLocalityNewContainer() {
-    Config config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
-
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    // 2 containers, but only return 1 existing mapping
-    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", 
"abc-affinity"))));
-
-    Map<String, LocationId> processorLocality = 
JobModelManager.getProcessorLocality(config, mockLocalityManager);
-
-    verify(mockLocalityManager).readLocality();
-    ImmutableMap<String, LocationId> expected = ImmutableMap.of(
-        // found entry in existing locality
-        "0", new LocationId("abc-affinity"),
-        // no entry in existing locality
-        "1", new LocationId("ANY_HOST"));
-    Assert.assertEquals(expected, processorLocality);
-  }
-
-  @Test
-  public void testUpdateTaskAssignments() {
-    // Mocking setup.
-    JobModel mockJobModel = Mockito.mock(JobModel.class);
-    GrouperMetadataImpl mockGrouperMetadata = 
Mockito.mock(GrouperMetadataImpl.class);
-    TaskAssignmentManager mockTaskAssignmentManager = 
Mockito.mock(TaskAssignmentManager.class);
-    TaskPartitionAssignmentManager mockTaskPartitionAssignmentManager = 
Mockito.mock(TaskPartitionAssignmentManager.class);
-
-    SystemStreamPartition testSystemStreamPartition1 = new 
SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new 
Partition(1));
-    SystemStreamPartition testSystemStreamPartition2 = new 
SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new 
Partition(2));
-    SystemStreamPartition testSystemStreamPartition3 = new 
SystemStreamPartition(new SystemStream("test-system-2", "test-stream-2"), new 
Partition(1));
-    SystemStreamPartition testSystemStreamPartition4 = new 
SystemStreamPartition(new SystemStream("test-system-3", "test-stream-3"), new 
Partition(2));
-
-    Map<TaskName, TaskModel> taskModelMap = new HashMap<>();
-    taskModelMap.put(new TaskName("task-1"), new TaskModel(new 
TaskName("task-1"), ImmutableSet.of(testSystemStreamPartition1), new 
Partition(0)));
-    taskModelMap.put(new TaskName("task-2"), new TaskModel(new 
TaskName("task-2"), ImmutableSet.of(testSystemStreamPartition2), new 
Partition(1)));
-    taskModelMap.put(new TaskName("task-3"), new TaskModel(new 
TaskName("task-3"), ImmutableSet.of(testSystemStreamPartition3), new 
Partition(2)));
-    taskModelMap.put(new TaskName("task-4"), new TaskModel(new 
TaskName("task-4"), ImmutableSet.of(testSystemStreamPartition4), new 
Partition(3)));
-    ContainerModel containerModel = new ContainerModel("test-container-id", 
taskModelMap);
-    Map<String, ContainerModel> containerMapping = 
ImmutableMap.of("test-container-id", containerModel);
-
-    when(mockJobModel.getContainers()).thenReturn(containerMapping);
-    
when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new 
HashMap<>());
-    
Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMappings(Mockito.any());
-
-    JobModelManager.updateTaskAssignments(mockJobModel, 
mockTaskAssignmentManager, mockTaskPartitionAssignmentManager, 
mockGrouperMetadata);
-
-    Set<String> taskNames = new HashSet<String>();
-    taskNames.add("task-4");
-    taskNames.add("task-2");
-    taskNames.add("task-3");
-    taskNames.add("task-1");
-
-    Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>();
-    systemStreamPartitions.add(new SystemStreamPartition(new 
SystemStream("test-system-0", "test-stream-0"), new Partition(1)));
-    systemStreamPartitions.add(new SystemStreamPartition(new 
SystemStream("test-system-1", "test-stream-1"), new Partition(2)));
-    systemStreamPartitions.add(new SystemStreamPartition(new 
SystemStream("test-system-2", "test-stream-2"), new Partition(1)));
-    systemStreamPartitions.add(new SystemStreamPartition(new 
SystemStream("test-system-3", "test-stream-3"), new Partition(2)));
-
-    // Verifications
-    verify(mockJobModel, atLeast(1)).getContainers();
-    
verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-    
verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
-        ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, 
"task-3", TaskMode.Active, "task-4", TaskMode.Active)));
-
-    // Verify that the old, stale partition mappings had been purged in the 
coordinator stream.
-    verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
-
-    // Verify that the new task to partition assignment is stored in the 
coordinator stream.
-    
verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
-        testSystemStreamPartition1, ImmutableList.of("task-1"),
-        testSystemStreamPartition2, ImmutableList.of("task-2"),
-        testSystemStreamPartition3, ImmutableList.of("task-3"),
-        testSystemStreamPartition4, ImmutableList.of("task-4")
-    ));
-  }
-
-  @Test
-  public void 
testJobModelContainsLatestTaskInputsWhenEnabledRegexTopicRewriter() {
-    ImmutableMap<String, String> rewriterConfig = ImmutableMap.of(
-        JobConfig.CONFIG_REWRITERS, "regexTopicRewriter",
-        String.format(JobConfig.CONFIG_REWRITER_CLASS, "regexTopicRewriter"), 
RegExTopicGenerator.class.getCanonicalName()
-    );
-
-    Config config = new MapConfig(rewriterConfig);
-    String taskInputMatchedRegex = inputStream.getSystem() + "." + 
inputStream.getStream();
-    Config refreshedConfig = new MapConfig(ImmutableMap.<String, 
String>builder()
-        .putAll(rewriterConfig)
-        .put(TaskConfig.INPUT_STREAMS, taskInputMatchedRegex)
-        .build()
-    );
-
-    PowerMockito.mockStatic(ConfigUtil.class);
-    PowerMockito.when(ConfigUtil.applyRewriter(config, 
"regexTopicRewriter")).thenReturn(refreshedConfig);
-
-    Map<TaskName, Integer> changeLogPartitionMapping = new HashMap<>();
-    GrouperMetadata grouperMetadata = mock(GrouperMetadata.class);
-
-    JobModel jobModel =
-        JobModelManager.readJobModel(config, changeLogPartitionMapping, 
mockStreamMetadataCache, grouperMetadata);
-
-    Assert.assertNotNull(jobModel);
-    Assert.assertEquals(taskInputMatchedRegex, 
jobModel.getConfig().get(TaskConfig.INPUT_STREAMS));
-  }
-}
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
similarity index 89%
rename from 
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
rename to 
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
index 123872b..95856fa 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.coordinator
 
-import java.util
-
 import org.apache.samza.Partition
 import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.config._
@@ -28,25 +26,25 @@ import org.apache.samza.container.{SamzaContainer, TaskName}
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import 
org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory, 
MockCoordinatorStreamWrappedConsumer}
-import org.apache.samza.job.MockJobFactory
 import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.storage.ChangelogStreamManager
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
 import org.apache.samza.util.HttpUtil
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
-import org.mockito.Mockito.{mock, when}
 import org.scalatest.{FlatSpec, PrivateMethodTester}
 
+import java.util
 import scala.collection.JavaConverters._
-import scala.collection.immutable
 
-
-class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
+/**
+ * This used to be called TestJobCoordinator, but JobCoordinator is currently 
an interface, so that old name seemed
+ * incorrect. The object-under-test seems to actually be JobModelManager.
+ */
+class TestJobModelManager extends FlatSpec with PrivateMethodTester {
   /**
    * Builds a coordinator from config, and then compares it with what was
    * expected. We simulate having a checkpoint manager that has 2 task
@@ -231,29 +229,6 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
     assertEquals(jobModel, coordinator.jobModel)
   }
 
-  /**
-    * Test with a JobFactory other than ProcessJobFactory or ThreadJobFactory 
so that
-    * JobConfing.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX does not match.
-    */
-  @Test
-  def testWithPartitionAssignmentWithMockJobFactory {
-    val config = getTestConfig(classOf[MockJobFactory])
-    val systemStream = new SystemStream("test", "stream1")
-    val streamMetadataCache = mock(classOf[StreamMetadataCache])
-    when(streamMetadataCache.getStreamMetadata(Set(systemStream), 
true)).thenReturn(
-      Map(systemStream -> new SystemStreamMetadata(systemStream.getStream,
-        Map(new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
-          new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
-          new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
-        ).asJava)))
-    val getInputStreamPartitions = 
PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
-    val getMatchedInputStreamPartitions = 
PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
-
-    val allSSP = JobModelManager invokePrivate 
getInputStreamPartitions(config, streamMetadataCache)
-    val matchedSSP = JobModelManager invokePrivate 
getMatchedInputStreamPartitions(config, streamMetadataCache)
-    assertEquals(matchedSSP, allSSP)
-  }
-
   def getTestConfig(clazz : Class[_]) = {
     val config = new MapConfig(Map(
       TaskConfig.CHECKPOINT_MANAGER_FACTORY -> 
classOf[MockCheckpointManagerFactory].getCanonicalName,

Reply via email to