SAMZA-1973: Unify the TaskNameGrouper interface for yarn and standalone. This patch consists of the following changes: * Unify the different methods present in the TaskNameGrouper interface. This will enable us to have a single interface method usable for both the yarn and standalone models. * Generate locationId aware task assignment to processors in standalone. * Move the task assignment persistence logic from a custom `TaskNameGrouper` implementation to `JobModelManager`, so that this works for any kind of custom group. * General code clean up in `JobModelManager`, `TaskAssignmentManager` and in other samza internal classes. * Read/write taskLocality of the processors in standalone. * Updated the existing java docs and added java docs where they were missing.
Testing: * Fixed the existing unit-tests due to the changes. * Added new unit tests for the functionality changed added as a part of this patch. * Tested this patch with a sample job from `hello-samza` project and verified that it works as expected. Please refer to [SEP-11](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309) for more details. Author: Shanthoosh Venkataraman <spven...@usc.edu> Author: Shanthoosh Venkataraman <svenk...@linkedin.com> Author: svenkata <svenkatara...@linkedin.com> Reviewers: Prateek M<pmahe...@linkedin.com> Closes #790 from shanthoosh/task_name_grouper_changes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5ea72584 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5ea72584 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5ea72584 Branch: refs/heads/master Commit: 5ea72584f6b92937ec130f486d6f70603b7188c2 Parents: c7e5dcb Author: Shanthoosh Venkataraman <spven...@usc.edu> Authored: Wed Dec 5 10:56:55 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Wed Dec 5 10:56:55 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/job/model/TaskModel.java | 1 - .../samza/coordinator/AzureJobCoordinator.java | 7 +- .../ClusterBasedJobCoordinator.java | 3 +- .../samza/config/ClusterManagerConfig.java | 14 +- .../grouper/task/BalancingTaskNameGrouper.java | 5 +- .../grouper/task/GroupByContainerCount.java | 228 ++++--------- .../task/GroupByContainerCountFactory.java | 3 +- .../grouper/task/GroupByContainerIds.java | 171 ++++++++-- .../container/grouper/task/GrouperMetadata.java | 58 ++++ .../grouper/task/GrouperMetadataImpl.java | 72 +++++ .../grouper/task/TaskAssignmentManager.java | 3 - .../samza/container/grouper/task/TaskGroup.java | 85 +++++ .../container/grouper/task/TaskNameGrouper.java | 39 ++- .../grouper/task/TaskNameGrouperFactory.java | 2 +- .../samza/execution/ExecutionPlanner.java | 2 +- .../apache/samza/processor/StreamProcessor.java | 3 +- .../samza/runtime/LocalContainerRunner.java | 18 +- .../standalone/PassthroughJobCoordinator.java | 30 +- .../apache/samza/storage/StorageRecovery.java | 5 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 74 ++++- .../main/java/org/apache/samza/zk/ZkUtils.java | 27 ++ .../apache/samza/container/SamzaContainer.scala | 12 +- .../samza/coordinator/JobModelManager.scala | 231 +++++++++---- .../samza/job/local/ProcessJobFactory.scala | 2 +- .../samza/job/local/ThreadJobFactory.scala | 2 +- .../grouper/task/TestGroupByContainerCount.java | 320 ++++++------------- .../grouper/task/TestGroupByContainerIds.java | 292 +++++++++++++++-- .../grouper/task/TestTaskAssignmentManager.java | 3 - .../samza/container/mock/ContainerMocks.java | 6 +- .../coordinator/JobModelManagerTestUtil.java | 17 +- .../samza/coordinator/TestJobModelManager.java | 114 ++++++- .../java/org/apache/samza/zk/TestZkUtils.java | 73 ++++- .../samza/container/TestSamzaContainer.scala | 38 ++- .../apache/samza/test/framework/TestRunner.java | 2 +- .../processor/TestZkStreamProcessorBase.java | 2 - .../processor/TestZkLocalApplicationRunner.java | 10 +- .../samza/validation/YarnJobValidationTool.java | 6 +- .../webapp/TestApplicationMasterRestClient.java | 4 +- 38 files changed, 1348 insertions(+), 636 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java index 7ee7609..36917cf 100644 --- a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java +++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java @@ -99,7 +99,6 @@ public class TaskModel implements Comparable<TaskModel> { } @Override - public String toString() { return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]"; } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java index 96f628c..076ab54 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -30,6 +30,8 @@ import org.apache.samza.config.TaskConfig; import org.apache.samza.container.TaskName; import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper; import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory; +import org.apache.samza.container.grouper.task.GrouperMetadata; +import org.apache.samza.container.grouper.task.GrouperMetadataImpl; import org.apache.samza.coordinator.data.BarrierState; import org.apache.samza.coordinator.data.ProcessorEntity; import org.apache.samza.coordinator.scheduler.HeartbeatScheduler; @@ -54,7 +56,6 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -365,8 +366,8 @@ public class AzureJobCoordinator implements JobCoordinator { } // Generate the new JobModel - JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), - null, streamMetadataCache, currentProcessorIds); + GrouperMetadata grouperMetadata = new GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata); LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion); // Publish the new job model http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 4c5a34b..0eddbf2 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -185,8 +185,7 @@ public class ClusterBasedJobCoordinator { // build a JobModelManager and ChangelogStreamManager and perform partition assignments. changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - jobModelManager = - JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); + jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metrics); config = jobModelManager.jobModel().getConfig(); hasDurableStores = new StorageConfig(config).hasDurableStores(); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index cb86a58..eda1be8 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -52,10 +52,14 @@ public class ClusterManagerConfig extends MapConfig { private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000; /** - * Flag to indicate if host-affinity is enabled for the job or not + * NOTE: This field is deprecated. */ public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; - public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled"; + + /** + * Flag to indicate if host-affinity is enabled for the job or not + */ + public static final String JOB_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled"; /** * Number of CPU cores to request from the cluster manager per container @@ -145,10 +149,10 @@ public class ClusterManagerConfig extends MapConfig { } public boolean getHostAffinityEnabled() { - if (containsKey(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)) { - return getBoolean(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED); + if (containsKey(JOB_HOST_AFFINITY_ENABLED)) { + return getBoolean(JOB_HOST_AFFINITY_ENABLED); } else if (containsKey(HOST_AFFINITY_ENABLED)) { - log.info("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, CLUSTER_MANAGER_HOST_AFFINITY_ENABLED); + log.warn("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, JOB_HOST_AFFINITY_ENABLED); return getBoolean(HOST_AFFINITY_ENABLED); } else { return false; http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java index f8295c8..91eab54 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java @@ -54,5 +54,8 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper { * @param localityManager provides a persisted task to container map to use as a baseline * @return the grouped tasks in the form of ContainerModels */ - Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager); + @Deprecated + default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) { + return group(tasks); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java index 759f82e..8a741db 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java @@ -27,19 +27,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; - -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.container.LocalityManager; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; -import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.SamzaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Group the SSP taskNames by dividing the number of taskNames into the number * of containers (n) and assigning n taskNames to each container as returned by @@ -51,19 +45,21 @@ import org.slf4j.LoggerFactory; * TODO: SAMZA-1197 - need to modify balance to work with processorId strings */ public class GroupByContainerCount implements BalancingTaskNameGrouper { - private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class); + private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerCount.class); private final int containerCount; - private final Config config; - public GroupByContainerCount(Config config) { - this.containerCount = new JobConfig(config).getContainerCount(); - this.config = config; - if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); + public GroupByContainerCount(int containerCount) { + if (containerCount <= 0) { + throw new IllegalArgumentException("Must have at least one container"); + } + this.containerCount = containerCount; } + /** + * {@inheritDoc} + */ @Override public Set<ContainerModel> group(Set<TaskModel> tasks) { - validateTasks(tasks); // Sort tasks by taskName. @@ -89,79 +85,63 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { return Collections.unmodifiableSet(containerModels); } + /** + * {@inheritDoc} + */ @Override - public Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) { - + public Set<ContainerModel> group(Set<TaskModel> tasks, GrouperMetadata grouperMetadata) { validateTasks(tasks); - if (localityManager == null) { - log.info("Locality manager is null. Cannot read or write task assignments. Invoking grouper."); + List<TaskGroup> containers = getPreviousContainers(grouperMetadata, tasks.size()); + if (containers == null || containers.size() == 1 || containerCount == 1) { + LOG.info("Balancing does not apply. Invoking grouper."); return group(tasks); } - TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap()); - taskAssignmentManager.init(); - try { - List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size()); - if (containers == null || containers.size() == 1 || containerCount == 1) { - log.info("Balancing does not apply. Invoking grouper."); - Set<ContainerModel> models = group(tasks); - saveTaskAssignments(models, taskAssignmentManager); - return models; - } - - int prevContainerCount = containers.size(); - int containerDelta = containerCount - prevContainerCount; - if (containerDelta == 0) { - log.info("Container count has not changed. Reusing previous container models."); - return buildContainerModels(tasks, containers); - } - log.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount); - - // Calculate the expected task count per container - int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount); + int prevContainerCount = containers.size(); + int containerDelta = containerCount - prevContainerCount; + if (containerDelta == 0) { + LOG.info("Container count has not changed. Reusing previous container models."); + return TaskGroup.buildContainerModels(tasks, containers); + } + LOG.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount); - // Collect excess tasks from over-assigned containers - List<String> taskNamesToReassign = new LinkedList<>(); - for (int i = 0; i < prevContainerCount; i++) { - TaskGroup taskGroup = containers.get(i); - while (taskGroup.size() > expectedTaskCountPerContainer[i]) { - taskNamesToReassign.add(taskGroup.removeTask()); - } - } + // Calculate the expected task count per container + int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount); - // Assign tasks to the under-assigned containers - if (containerDelta > 0) { - List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount); - containers.addAll(newContainers); - } else { - containers = containers.subList(0, containerCount); + // Collect excess tasks from over-assigned containers + List<String> taskNamesToReassign = new LinkedList<>(); + for (int i = 0; i < prevContainerCount; i++) { + TaskGroup taskGroup = containers.get(i); + while (taskGroup.size() > expectedTaskCountPerContainer[i]) { + taskNamesToReassign.add(taskGroup.removeLastTaskName()); } - assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers); + } - // Transform containers to containerModel - Set<ContainerModel> models = buildContainerModels(tasks, containers); + // Assign tasks to the under-assigned containers + if (containerDelta > 0) { + List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount); + containers.addAll(newContainers); + } else { + containers = containers.subList(0, containerCount); + } - // Save the results - saveTaskAssignments(models, taskAssignmentManager); + assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers); - return models; - } finally { - taskAssignmentManager.close(); - } + return TaskGroup.buildContainerModels(tasks, containers); } /** - * Reads the task-container mapping from the provided {@link TaskAssignmentManager} and returns a + * Reads the task-container mapping from the provided {@link GrouperMetadata} and returns a * list of TaskGroups, ordered ascending by containerId. * - * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to retrieve the previous mapping. - * @param taskCount the number of tasks, for validation against the persisted tasks. - * @return a list of TaskGroups, ordered ascending by containerId or {@code null} - * if the previous mapping doesn't exist or isn't usable. + * @param grouperMetadata the {@link GrouperMetadata} will be used to retrieve the previous task to container assignments. + * @param taskCount the number of tasks, for validation against the persisted tasks. + * @return a list of TaskGroups, ordered ascending by containerId or {@code null} + * if the previous mapping doesn't exist or isn't usable. */ - private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) { - Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment(); + private List<TaskGroup> getPreviousContainers(GrouperMetadata grouperMetadata, int taskCount) { + Map<TaskName, String> taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment(); taskToContainerId.values().forEach(id -> { try { int intId = Integer.parseInt(id); @@ -169,19 +149,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe); } }); + if (taskToContainerId.isEmpty()) { - log.info("No task assignment map was saved."); + LOG.info("No task assignment map was saved."); return null; } else if (taskCount != taskToContainerId.size()) { - log.warn( - "Current task count {} does not match saved task count {}. Stateful jobs may observe misalignment of keys!", - taskCount, taskToContainerId.size()); - // If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that - // without a much more complicated mapping. Further, the partition count may have changed, which means - // input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary - // data associated with the incoming keys. Warn the user and default to grouper - // In this scenario the tasks may have been reduced, so we need to delete all the existing messages - taskAssignmentManager.deleteTaskContainerMappings(taskToContainerId.keySet()); return null; } @@ -189,27 +161,13 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { try { containers = getOrderedContainers(taskToContainerId); } catch (Exception e) { - log.error("Exception while parsing task mapping", e); + LOG.error("Exception while parsing task mapping", e); return null; } return containers; } /** - * Saves the task assignments specified by containers using the provided TaskAssignementManager. - * - * @param containers the set of containers from which the task assignments will be saved. - * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to save the mappings. - */ - private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) { - for (ContainerModel container : containers) { - for (TaskName taskName : container.getTasks().keySet()) { - taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getId()); - } - } - } - - /** * Verifies the input tasks argument and throws {@link IllegalArgumentException} if it is invalid. * * @param tasks the tasks to validate. @@ -252,13 +210,12 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { * @param containers the containers (as {@link TaskGroup}) to which the tasks will be assigned. */ // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount) - private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign, - List<TaskGroup> containers) { + private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign, List<TaskGroup> containers) { for (TaskGroup taskGroup : containers) { for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) { String taskName = taskNamesToAssign.remove(0); taskGroup.addTaskName(taskName); - log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId()); + LOG.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId()); } } } @@ -288,53 +245,20 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { } /** - * Translates the list of TaskGroup instances to a set of ContainerModel instances, using the - * set of TaskModel instances. - * - * @param tasks the TaskModels to assign to the ContainerModels. - * @param containerTasks the TaskGroups defining how the tasks should be grouped. - * @return a mutable set of ContainerModels. - */ - private Set<ContainerModel> buildContainerModels(Set<TaskModel> tasks, List<TaskGroup> containerTasks) { - // Map task names to models - Map<String, TaskModel> taskNameToModel = new HashMap<>(); - for (TaskModel model : tasks) { - taskNameToModel.put(model.getTaskName().getTaskName(), model); - } - - // Build container models - Set<ContainerModel> containerModels = new HashSet<>(); - for (TaskGroup container : containerTasks) { - Map<TaskName, TaskModel> containerTaskModels = new HashMap<>(); - for (String taskName : container.taskNames) { - TaskModel model = taskNameToModel.get(taskName); - containerTaskModels.put(model.getTaskName(), model); - } - containerModels.add( - new ContainerModel(container.containerId, containerTaskModels)); - } - return Collections.unmodifiableSet(containerModels); - } - - /** * Converts the task->containerId map to an ordered list of {@link TaskGroup} instances. * * @param taskToContainerId a map from each task name to the containerId to which it is assigned. * @return a list of TaskGroups ordered ascending by containerId. */ - private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) { - log.debug("Got task to container map: {}", taskToContainerId); + private List<TaskGroup> getOrderedContainers(Map<TaskName, String> taskToContainerId) { + LOG.debug("Got task to container map: {}", taskToContainerId); // Group tasks by container Id - HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>(); - for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) { - String taskName = entry.getKey(); + Map<String, List<String>> containerIdToTaskNames = new HashMap<>(); + for (Map.Entry<TaskName, String> entry : taskToContainerId.entrySet()) { + String taskName = entry.getKey().getTaskName(); String containerId = entry.getValue(); - List<String> taskNames = containerIdToTaskNames.get(containerId); - if (taskNames == null) { - taskNames = new ArrayList<>(); - containerIdToTaskNames.put(containerId, taskNames); - } + List<String> taskNames = containerIdToTaskNames.computeIfAbsent(containerId, k -> new ArrayList<>()); taskNames.add(taskName); } @@ -347,36 +271,4 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper { return containerTasks; } - - /** - * A mutable group of tasks and an associated container id. - * - * Used as a temporary mutable container until the final ContainerModel is known. - */ - private static class TaskGroup { - private final List<String> taskNames = new LinkedList<>(); - private final String containerId; - - private TaskGroup(String containerId, List<String> taskNames) { - this.containerId = containerId; - Collections.sort(taskNames); // For consistency because the taskNames came from a Map - this.taskNames.addAll(taskNames); - } - - public String getContainerId() { - return containerId; - } - - public void addTaskName(String taskName) { - taskNames.add(taskName); - } - - public String removeTask() { - return taskNames.remove(taskNames.size() - 1); - } - - public int size() { - return taskNames.size(); - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java index 06aba33..5acf5b8 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java @@ -19,6 +19,7 @@ package org.apache.samza.container.grouper.task; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; /** * Factory to build the GroupByContainerCount class. @@ -26,6 +27,6 @@ import org.apache.samza.config.Config; public class GroupByContainerCountFactory implements TaskNameGrouperFactory { @Override public TaskNameGrouper build(Config config) { - return new GroupByContainerCount(config); + return new GroupByContainerCount(new JobConfig(config).getContainerCount()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java index 9dab943..7c11da4 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java @@ -19,27 +19,34 @@ package org.apache.samza.container.grouper.task; -import java.util.Arrays; -import java.util.stream.Collectors; -import org.apache.samza.container.TaskName; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.TaskModel; - import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import org.apache.commons.collections4.MapUtils; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.runtime.LocationId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * Simple grouper. - * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container - * IDs as an argument. Please note - this first implementation ignores locality information. + * A {@link TaskNameGrouper} implementation that provides two different grouping strategies: + * + * - One that assigns the tasks to the available containerIds in a round robin fashion. + * - The other that generates a equidistributed and locality-aware task to container assignment. */ public class GroupByContainerIds implements TaskNameGrouper { private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class); @@ -49,6 +56,9 @@ public class GroupByContainerIds implements TaskNameGrouper { this.startContainerCount = count; } + /** + * {@inheritDoc} + */ @Override public Set<ContainerModel> group(Set<TaskModel> tasks) { List<String> containerIds = new ArrayList<>(startContainerCount); @@ -58,30 +68,40 @@ public class GroupByContainerIds implements TaskNameGrouper { return group(tasks, containerIds); } - public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) { - if (containersIds == null) + /** + * {@inheritDoc} + * + * When number of taskModels are less than number of available containerIds, + * then chooses then selects the lexicographically least `x` containerIds. + * + * Otherwise, assigns the tasks to the available containerIds in a round robin fashion + * preserving the containerId in the final assignment. + */ + @Override + public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containerIds) { + if (containerIds == null) return this.group(tasks); - if (containersIds.isEmpty()) + if (containerIds.isEmpty()) throw new IllegalArgumentException("Must have at least one container"); if (tasks.isEmpty()) - throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays - .toString(containersIds.toArray())); + throw new IllegalArgumentException("cannot group an empty set. containerIds=" + Arrays + .toString(containerIds.toArray())); - if (containersIds.size() > tasks.size()) { - LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containersIds.size(), tasks.size()); + if (containerIds.size() > tasks.size()) { + LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containerIds.size(), tasks.size()); /** * Choose lexicographically least `x` containerIds(where x = tasks.size()). */ - containersIds = containersIds.stream() + containerIds = containerIds.stream() .sorted() .limit(tasks.size()) .collect(Collectors.toList()); - LOG.info("Generating containerModel with containers: {}.", containersIds); + LOG.info("Generating containerModel with containers: {}.", containerIds); } - int containerCount = containersIds.size(); + int containerCount = containerIds.size(); // Sort tasks by taskName. List<TaskModel> sortedTasks = new ArrayList<>(tasks); @@ -100,9 +120,118 @@ public class GroupByContainerIds implements TaskNameGrouper { // Convert to a Set of ContainerModel Set<ContainerModel> containerModels = new HashSet<>(); for (int i = 0; i < containerCount; i++) { - containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i])); + containerModels.add(new ContainerModel(containerIds.get(i), taskGroups[i])); } return Collections.unmodifiableSet(containerModels); } + + /** + * {@inheritDoc} + * + * When the are `t` tasks and `p` processors, where t <= p, a fair task distribution should ideally assign + * (t / p) tasks to each processor. In addition to guaranteeing a fair distribution, this {@link TaskNameGrouper} + * implementation generates a locationId aware task assignment to processors where it makes best efforts in assigning + * the tasks to processors with the same locality. + * + * Task assignment to processors is accomplished through the following two phases: + * + * 1. In the first phase, each task(T) is assigned to a processor(P) that satisfies the following constraints: + * A. The processor(P) should have the same locality of the task(T). + * B. Number of tasks already assigned to the processor should be less than the (number of tasks / number of processors). + * + * 2. Each unassigned task from phase 1 are then mapped to any processor with task count less than the + * (number of tasks / number of processors). When no such processor exists, then the unassigned + * task is mapped to any processor from available processors in a round robin fashion. + */ + @Override + public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) { + // Validate that the task models are not empty. + Map<TaskName, LocationId> taskLocality = grouperMetadata.getTaskLocality(); + Preconditions.checkArgument(!taskModels.isEmpty(), "No tasks found. Likely due to no input partitions. Can't run a job with no tasks."); + + // Invoke the default grouper when the processor locality does not exist. + if (MapUtils.isEmpty(grouperMetadata.getProcessorLocality())) { + LOG.info("ProcessorLocality is empty. Generating with the default group method."); + return group(taskModels, new ArrayList<>()); + } + + Map<String, LocationId> processorLocality = new TreeMap<>(grouperMetadata.getProcessorLocality()); + /** + * When there're more task models than processors then choose the lexicographically least `x` processors(where x = tasks.size()). + */ + if (processorLocality.size() > taskModels.size()) { + processorLocality = processorLocality.entrySet() + .stream() + .limit(taskModels.size()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + Map<LocationId, List<String>> locationIdToProcessors = new HashMap<>(); + Map<String, TaskGroup> processorIdToTaskGroup = new HashMap<>(); + + // Generate the {@see LocationId} to processors mapping and processorId to {@see TaskGroup} mapping. + processorLocality.forEach((processorId, locationId) -> { + List<String> processorIds = locationIdToProcessors.getOrDefault(locationId, new ArrayList<>()); + processorIds.add(processorId); + locationIdToProcessors.put(locationId, processorIds); + processorIdToTaskGroup.put(processorId, new TaskGroup(processorId, new ArrayList<>())); + }); + + int numTasksPerProcessor = taskModels.size() / processorLocality.size(); + Set<TaskName> assignedTasks = new HashSet<>(); + + /** + * A processor is considered under-assigned when number of tasks assigned to it is less than + * (number of tasks / number of processors). + * Map the tasks to the under-assigned processors with same locality. + */ + for (TaskModel taskModel : taskModels) { + LocationId taskLocationId = taskLocality.get(taskModel.getTaskName()); + if (taskLocationId != null) { + List<String> processorIds = locationIdToProcessors.getOrDefault(taskLocationId, new ArrayList<>()); + for (String processorId : processorIds) { + TaskGroup taskGroup = processorIdToTaskGroup.get(processorId); + if (taskGroup.size() < numTasksPerProcessor) { + taskGroup.addTaskName(taskModel.getTaskName().getTaskName()); + assignedTasks.add(taskModel.getTaskName()); + break; + } + } + } + } + + /** + * In some scenarios, the task either might not have any previous locality or might not have any + * processor that maps to its previous locality. This cyclic processorId's iterator helps us in + * those scenarios to assign the processorIds to those kind of tasks in a round robin fashion. + */ + Iterator<String> processorIdsCyclicIterator = Iterators.cycle(processorLocality.keySet()); + + // Order the taskGroups to choose a task group in a deterministic fashion for unassigned tasks. + List<TaskGroup> taskGroups = new ArrayList<>(processorIdToTaskGroup.values()); + taskGroups.sort(Comparator.comparing(TaskGroup::getContainerId)); + + /** + * For the tasks left over from the previous stage, map them to any under-assigned processor. + * When a under-assigned processor doesn't exist, then map them to any processor from the + * available processors in a round robin manner. + */ + for (TaskModel taskModel : taskModels) { + if (!assignedTasks.contains(taskModel.getTaskName())) { + Optional<TaskGroup> underAssignedTaskGroup = taskGroups.stream() + .filter(taskGroup -> taskGroup.size() < numTasksPerProcessor) + .findFirst(); + if (underAssignedTaskGroup.isPresent()) { + underAssignedTaskGroup.get().addTaskName(taskModel.getTaskName().getTaskName()); + } else { + TaskGroup taskGroup = processorIdToTaskGroup.get(processorIdsCyclicIterator.next()); + taskGroup.addTaskName(taskModel.getTaskName().getTaskName()); + } + assignedTasks.add(taskModel.getTaskName()); + } + } + + return TaskGroup.buildContainerModels(taskModels, taskGroups); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java new file mode 100644 index 0000000..ef919d1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.container.TaskName; +import org.apache.samza.runtime.LocationId; +import org.apache.samza.system.SystemStreamPartition; +import java.util.List; +import java.util.Map; + +/** + * Provides the historical metadata of the samza application. + */ +@InterfaceStability.Evolving +public interface GrouperMetadata { + + /** + * Gets the current processor locality of the job. + * @return the processorId to the {@link LocationId} assignment. + */ + Map<String, LocationId> getProcessorLocality(); + + /** + * Gets the current task locality of the job. + * @return the current {@link TaskName} to {@link LocationId} assignment. + */ + Map<TaskName, LocationId> getTaskLocality(); + + /** + * Gets the previous {@link TaskName} to {@link SystemStreamPartition} assignment of the job. + * @return the previous {@link TaskName} to {@link SystemStreamPartition} assignment. + */ + Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment(); + + + /** + * Gets the previous {@link TaskName} to processorId assignments of the job. + * @return the previous task to processorId assignment. + */ + Map<TaskName, String> getPreviousTaskToProcessorAssignment(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java new file mode 100644 index 0000000..bc40bc4 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + +import org.apache.samza.container.TaskName; +import org.apache.samza.runtime.LocationId; +import org.apache.samza.system.SystemStreamPartition; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link GrouperMetadata} that holds the necessary historical metadata of + * the samza job. This is used by the {@link TaskNameGrouper} to generate optimal task assignments. + */ +public class GrouperMetadataImpl implements GrouperMetadata { + + // Map of processorId to LocationId. + private final Map<String, LocationId> processorLocality; + + // Map of TaskName to LocationId. + private final Map<TaskName, LocationId> taskLocality; + + // Map of TaskName to a list of the input SystemStreamPartition's assigned to it. + private final Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignment; + + // Map of TaskName to ProcessorId. + private final Map<TaskName, String> previousTaskToProcessorAssignment; + + public GrouperMetadataImpl(Map<String, LocationId> processorLocality, Map<TaskName, LocationId> taskLocality, Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignments, Map<TaskName, String> previousTaskToProcessorAssignment) { + this.processorLocality = Collections.unmodifiableMap(processorLocality); + this.taskLocality = Collections.unmodifiableMap(taskLocality); + this.previousTaskToSSPAssignment = Collections.unmodifiableMap(previousTaskToSSPAssignments); + this.previousTaskToProcessorAssignment = Collections.unmodifiableMap(previousTaskToProcessorAssignment); + } + + @Override + public Map<String, LocationId> getProcessorLocality() { + return processorLocality; + } + + @Override + public Map<TaskName, LocationId> getTaskLocality() { + return taskLocality; + } + + @Override + public Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment() { + return previousTaskToSSPAssignment; + } + + @Override + public Map<TaskName, String> getPreviousTaskToProcessorAssignment() { + return this.previousTaskToProcessorAssignment; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 32bbf29..b6e946c 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -81,9 +81,6 @@ public class TaskAssignmentManager { this.valueSerde = valueSerde; MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class); this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry); - } - - public void init() { this.metadataStore.init(); } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java new file mode 100644 index 0000000..1fe0f40 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.container.grouper.task; + +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; + +import java.util.*; + +/** + * A mutable group of tasks and an associated container id. + * + * Used as a temporary mutable container until the final ContainerModel is known. + */ +class TaskGroup { + private final List<String> taskNames = new LinkedList<>(); + private final String containerId; + + TaskGroup(String containerId, List<String> taskNames) { + this.containerId = containerId; + this.taskNames.addAll(taskNames); + Collections.sort(this.taskNames); // For consistency because the taskNames came from a Map + } + + public String getContainerId() { + return containerId; + } + + public void addTaskName(String taskName) { + taskNames.add(taskName); + } + + public String removeLastTaskName() { + return taskNames.remove(taskNames.size() - 1); + } + + public int size() { + return taskNames.size(); + } + + /** + * Converts the {@link TaskGroup} list to a set of ContainerModel. + * + * @param taskModels the TaskModels to assign to the ContainerModels. + * @param taskGroups the TaskGroups defining how the tasks should be grouped. + * @return a set of ContainerModels. + */ + public static Set<ContainerModel> buildContainerModels(Set<TaskModel> taskModels, Collection<TaskGroup> taskGroups) { + // Map task names to models + Map<String, TaskModel> taskNameToModel = new HashMap<>(); + for (TaskModel model : taskModels) { + taskNameToModel.put(model.getTaskName().getTaskName(), model); + } + + // Build container models + Set<ContainerModel> containerModels = new HashSet<>(); + for (TaskGroup container : taskGroups) { + Map<TaskName, TaskModel> containerTaskModels = new HashMap<>(); + for (String taskName : container.taskNames) { + TaskModel model = taskNameToModel.get(taskName); + containerTaskModels.put(model.getTaskName(), model); + } + containerModels.add(new ContainerModel(container.containerId, containerTaskModels)); + } + + return Collections.unmodifiableSet(containerModels); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java index 71b80cc..2124dfc 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java @@ -18,10 +18,9 @@ */ package org.apache.samza.container.grouper.task; -import java.util.List; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; - +import java.util.List; import java.util.Set; /** @@ -44,15 +43,39 @@ import java.util.Set; * </p> */ public interface TaskNameGrouper { + /** - * Group tasks into the containers they will share. + * Groups the taskModels into set of {@link ContainerModel} using the metadata of + * the job from {@link GrouperMetadata}. * - * @param tasks Set of tasks to group into containers. - * @return Set of containers, which contain the tasks that were passed in. + * @param taskModels the set of tasks to group into containers. + * @param grouperMetadata provides the historical metadata of the samza job. + * @return the grouped {@link ContainerModel} built from the provided taskModels. */ - Set<ContainerModel> group(Set<TaskModel> tasks); + default Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) { + return group(taskModels); + } - default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) { - return group(tasks); + /** + * Group the taskModels into set of {@link ContainerModel}. + * + * @param taskModels the set of {@link TaskModel} to group into containers. + * @return the grouped {@link ContainerModel} built from the provided taskModels. + */ + @Deprecated + default Set<ContainerModel> group(Set<TaskModel> taskModels) { + throw new UnsupportedOperationException(); + } + + /** + * Group the taskModels into set of {@link ContainerModel}. + * + * @param taskModels the set of {@link TaskModel} to group into containers. + * @param containersIds the list of container ids that has to be used in the {@link ContainerModel}. + * @return the grouped {@link ContainerModel} built from the provided taskModels. + */ + @Deprecated + default Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) { + return group(taskModels); } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java index 8b967b7..37684f4 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java @@ -28,7 +28,7 @@ public interface TaskNameGrouperFactory { * Builds a {@link TaskNameGrouper}. The config can be used to read the necessary values which are needed int the * process of building the {@link TaskNameGrouper} * - * @param config configuration to which values can be used to build a {@link TaskNameGrouper} + * @param config configuration to use for building the {@link TaskNameGrouper} * @return a {@link TaskNameGrouper} implementation */ TaskNameGrouper build(Config config); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 0110551..0c5e368 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -104,7 +104,7 @@ public class ExecutionPlanner { // currently we don't support host-affinity in batch mode if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterConfig.getHostAffinityEnabled()) { throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.", - ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)); + ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 7328bc7..34d67cc 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -317,7 +317,8 @@ public class StreamProcessor { return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter), this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config), Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)), - Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null))); + Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)), + null); } private JobCoordinator createJobCoordinator() { http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index c5c0d78..a5a45ba 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -22,6 +22,14 @@ package org.apache.samza.runtime; import java.util.HashMap; import java.util.Map; import java.util.Random; + +import org.apache.samza.container.ContainerHeartbeatClient; +import org.apache.samza.container.ContainerHeartbeatMonitor; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.SamzaContainer; +import org.apache.samza.container.SamzaContainer$; +import org.apache.samza.container.SamzaContainerListener; +import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.MDC; import org.apache.samza.SamzaException; import org.apache.samza.application.descriptors.ApplicationDescriptor; @@ -31,11 +39,6 @@ import org.apache.samza.application.ApplicationUtil; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.ShellCommandConfig; -import org.apache.samza.container.ContainerHeartbeatClient; -import org.apache.samza.container.ContainerHeartbeatMonitor; -import org.apache.samza.container.SamzaContainer; -import org.apache.samza.container.SamzaContainer$; -import org.apache.samza.container.SamzaContainerListener; import org.apache.samza.context.JobContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsReporter; @@ -47,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; - /** * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN. */ @@ -93,6 +95,7 @@ public class LocalContainerRunner { private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId, JobModel jobModel, Config config) { TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc); + LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap()); SamzaContainer container = SamzaContainer$.MODULE$.apply( containerId, jobModel, @@ -100,7 +103,8 @@ public class LocalContainerRunner { taskFactory, JobContextImpl.fromConfigWithDefaults(config), Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)), - Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null))); + Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)), + localityManager); ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory() .createInstance(new ProcessorContext() { }, config); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 737ac3e..44fd811 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -18,16 +18,22 @@ */ package org.apache.samza.standalone; +import com.google.common.collect.ImmutableMap; import org.apache.samza.checkpoint.CheckpointManager; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.grouper.task.GrouperMetadata; +import org.apache.samza.container.grouper.task.GrouperMetadataImpl; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.coordinator.JobCoordinatorListener; +import org.apache.samza.runtime.LocationId; +import org.apache.samza.runtime.LocationIdProvider; +import org.apache.samza.runtime.LocationIdProviderFactory; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; @@ -35,7 +41,6 @@ import org.apache.samza.system.SystemAdmins; import org.apache.samza.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.Collections; /** @@ -65,11 +70,15 @@ public class PassthroughJobCoordinator implements JobCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(PassthroughJobCoordinator.class); private final String processorId; private final Config config; + private final LocationId locationId; private JobCoordinatorListener coordinatorListener = null; public PassthroughJobCoordinator(Config config) { this.processorId = createProcessorId(config); this.config = config; + LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class); + LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config); + this.locationId = locationIdProvider.getLocationId(); } @Override @@ -119,18 +128,13 @@ public class PassthroughJobCoordinator implements JobCoordinator { SystemAdmins systemAdmins = new SystemAdmins(config); StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance()); systemAdmins.start(); - String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID())); - - /** TODO: - Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also, - in SamzaContainer for writing locality info to the coordinator stream. This closely couples together - TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator - (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) - */ - JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, - Collections.singletonList(containerId)); - systemAdmins.stop(); - return jobModel; + try { + String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID())); + GrouperMetadata grouperMetadata = new GrouperMetadataImpl(ImmutableMap.of(String.valueOf(containerId), locationId), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + return JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata); + } finally { + systemAdmins.stop(); + } } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 64ae310..5442d6e 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -125,12 +125,13 @@ public class StorageRecovery extends CommandLine { * map */ private void getContainerModels() { - CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, new MetricsRegistryMap()); + MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap(); + CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, metricsRegistryMap); coordinatorStreamManager.register(getClass().getSimpleName()); coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel(); + JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metricsRegistryMap).jobModel(); containers = jobModel.getContainers(); coordinatorStreamManager.stop(); } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 81c0465..8c5a3ba 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -20,6 +20,7 @@ package org.apache.samza.zk; import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; import java.util.HashSet; import java.util.HashMap; import java.util.List; @@ -40,6 +41,8 @@ import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.StorageConfig; import org.apache.samza.config.ZkConfig; import org.apache.samza.container.TaskName; +import org.apache.samza.container.grouper.task.GrouperMetadata; +import org.apache.samza.container.grouper.task.GrouperMetadataImpl; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelManager; @@ -47,17 +50,23 @@ import org.apache.samza.coordinator.LeaderElectorListener; import org.apache.samza.coordinator.StreamPartitionCountMonitor; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.metrics.ReadableMetricsRegistry; +import org.apache.samza.runtime.LocationId; +import org.apache.samza.runtime.LocationIdProvider; +import org.apache.samza.runtime.LocationIdProviderFactory; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.MetricsReporterLoader; import org.apache.samza.util.SystemClock; import org.apache.samza.util.Util; +import org.apache.samza.zk.ZkUtils.ProcessorNode; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +107,7 @@ public class ZkJobCoordinator implements JobCoordinator { private final SystemAdmins systemAdmins; private final int debounceTimeMs; private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>(); + private final LocationId locationId; private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; @@ -131,14 +141,16 @@ public class ZkJobCoordinator implements JobCoordinator { this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer); systemAdmins = new SystemAdmins(config); streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance()); + LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class); + LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config); + this.locationId = locationIdProvider.getLocationId(); } @Override public void start() { ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); zkUtils.validateZkVersion(); - zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder - .getJobModelPathPrefix()}); + zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()}); startMetrics(); systemAdmins.start(); @@ -246,7 +258,13 @@ public class ZkJobCoordinator implements JobCoordinator { } void doOnProcessorChange() { - List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs(); + List<ProcessorNode> processorNodes = zkUtils.getAllProcessorNodes(); + + List<String> currentProcessorIds = new ArrayList<>(); + for (ProcessorNode processorNode : processorNodes) { + currentProcessorIds.add(processorNode.getProcessorData().getProcessorId()); + } + Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds); if (currentProcessorIds.size() != uniqueProcessorIds.size()) { @@ -256,7 +274,7 @@ public class ZkJobCoordinator implements JobCoordinator { // Generate the JobModel LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds); - JobModel jobModel = generateNewJobModel(currentProcessorIds); + JobModel jobModel = generateNewJobModel(processorNodes); // Create checkpoint and changelog streams if they don't exist if (!hasCreatedStreams) { @@ -308,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator { /** * Generate new JobModel when becoming a leader or the list of processor changed. */ - private JobModel generateNewJobModel(List<String> processors) { + private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) { String zkJobModelVersion = zkUtils.getJobModelVersion(); // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper. if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) { @@ -318,11 +336,9 @@ public class ZkJobCoordinator implements JobCoordinator { } cachedJobModelVersion = zkJobModelVersion; } - /** - * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container - * to host mapping) is passed in as null when building the jobModel. - */ - JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors); + + GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, processorNodes); + JobModel model = JobModelManager.readJobModel(config, changeLogPartitionMap, streamMetadataCache, grouperMetadata); return new JobModel(new MapConfig(), model.getContainers()); } @@ -343,6 +359,39 @@ public class ZkJobCoordinator implements JobCoordinator { }); } + /** + * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion} + * and {@param processorNodes}. + * @param jobModelVersion the most recent jobModelVersion available in the zookeeper. + * @param processorNodes the list of live processors in the zookeeper. + * @return the built grouper metadata. + */ + private GrouperMetadataImpl getGrouperMetadata(String jobModelVersion, List<ProcessorNode> processorNodes) { + Map<TaskName, String> taskToProcessorId = new HashMap<>(); + Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>(); + if (jobModelVersion != null) { + JobModel jobModel = zkUtils.getJobModel(jobModelVersion); + for (ContainerModel containerModel : jobModel.getContainers().values()) { + for (TaskModel taskModel : containerModel.getTasks().values()) { + taskToProcessorId.put(taskModel.getTaskName(), containerModel.getId()); + for (SystemStreamPartition partition : taskModel.getSystemStreamPartitions()) { + taskToSSPs.computeIfAbsent(taskModel.getTaskName(), k -> new ArrayList<>()); + taskToSSPs.get(taskModel.getTaskName()).add(partition); + } + } + } + } + + Map<String, LocationId> processorLocality = new HashMap<>(); + for (ProcessorNode processorNode : processorNodes) { + ProcessorData processorData = processorNode.getProcessorData(); + processorLocality.put(processorData.getProcessorId(), processorData.getLocationId()); + } + + Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality(); + return new GrouperMetadataImpl(processorLocality, taskLocality, taskToSSPs, taskToProcessorId); + } + class LeaderElectorListenerImpl implements LeaderElectorListener { @Override public void onBecomingLeader() { @@ -390,6 +439,11 @@ public class ZkJobCoordinator implements JobCoordinator { JobModel jobModel = getJobModel(); // start the container with the new model if (coordinatorListener != null) { + for (ContainerModel containerModel : jobModel.getContainers().values()) { + for (TaskName taskName : containerModel.getTasks().keySet()) { + zkUtils.writeTaskLocality(taskName, locationId); + } + } coordinatorListener.onNewJobModel(processorId, jobModel); } }); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 6349432..56ea577 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.Objects; import java.util.TreeSet; import java.util.concurrent.TimeUnit; @@ -37,8 +39,10 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.runtime.LocationId; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; @@ -239,6 +243,29 @@ public class ZkUtils { return processorNodes; } + public void writeTaskLocality(TaskName taskName, LocationId locationId) { + String taskLocalityPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName); + validatePaths(new String[] {taskLocalityPath}); + writeData(taskLocalityPath, locationId.getId()); + } + + public Map<TaskName, LocationId> readTaskLocality() { + Map<TaskName, LocationId> taskLocality = new HashMap<>(); + String taskLocalityPath = keyBuilder.getTaskLocalityPath(); + List<String> tasks = new ArrayList<>(); + if (zkClient.exists(taskLocalityPath)) { + tasks = zkClient.getChildren(taskLocalityPath); + } + for (String taskName : tasks) { + String taskPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName); + String locationId = zkClient.readData(taskPath, true); + if (locationId != null) { + taskLocality.put(new TaskName(taskName), new LocationId(locationId)); + } + } + return taskLocality; + } + /** * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes) * http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 03effe6..865658f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -127,8 +127,8 @@ object SamzaContainer extends Logging { taskFactory: TaskFactory[_], jobContext: JobContext, applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]], - applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]] - ) = { + applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]], + localityManager: LocalityManager = null) = { val config = jobContext.getConfig val containerModel = jobModel.getContainers.get(containerId) val containerName = "samza-container-%s" format containerId @@ -735,6 +735,7 @@ object SamzaContainer extends Logging { systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, + localityManager = localityManager, offsetManager = offsetManager, securityManager = securityManager, metrics = samzaContainerMetrics, @@ -990,16 +991,13 @@ class SamzaContainer( def storeContainerLocality { val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled - if (isHostAffinityEnabled) { - val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry) + if (isHostAffinityEnabled && localityManager != null) { val containerId = containerContext.getContainerModel.getId val containerName = "SamzaContainer-" + containerId info("Registering %s with metadata store" format containerName) try { val hostInet = Util.getLocalHost - val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else "" - val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else "" - info("Writing container locality and JMX address to metadata store") + info("Writing container locality to metadata store") localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName) } catch { case uhe: UnknownHostException =>