This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 1169b31 [SAMZA-2667] Refactor logic in JobModelManager so it is
easier to use certain parts of it (#1515)
1169b31 is described below
commit 1169b316dd51a6c972c7fc7ab77ac7e34ba5fbde
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 23 16:57:54 2021 -0700
[SAMZA-2667] Refactor logic in JobModelManager so it is easier to use
certain parts of it (#1515)
Changes:
1. Extract JobModelManager.readJobModel into a separate class
JobModelCalculator which is only responsible for calculating JobModel. Convert
this logic from scala to java.
2. Extract some of the logic of JobModelManager.apply (get grouper
metadata, update task assignments) into a separate class JobModelHelper so that
logic is not tied to HttpServer for communication. Convert this logic from
scala to java.
3. Fix a bug regarding deletion of standby task assignments. Assignments
should get cleared when the old set of standby tasks is not the same as the new
set of standby tasks, but that was not happening because the condition was
checking for equality and it wasn't comparing the same type of objects. This
bug fix is the only expected functionality change.
4. Add more tests around the job model logic.
API changes and upgrade/usage instructions: N/A
---
.../samza/coordinator/AzureJobCoordinator.java | 3 +-
.../samza/coordinator/JobModelCalculator.java | 240 ++++++++++
.../apache/samza/coordinator/JobModelHelper.java | 217 +++++++++
.../standalone/PassthroughJobCoordinator.java | 5 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 5 +-
.../apache/samza/coordinator/JobModelManager.scala | 356 +--------------
.../samza/coordinator/TestJobModelCalculator.java | 487 +++++++++++++++++++++
.../samza/coordinator/TestJobModelHelper.java | 472 ++++++++++++++++++++
.../samza/coordinator/TestJobModelManager.java | 268 ------------
...Coordinator.scala => TestJobModelManager.scala} | 37 +-
10 files changed, 1440 insertions(+), 650 deletions(-)
diff --git
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 08e8124..710eefb 100644
---
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -367,7 +367,8 @@ public class AzureJobCoordinator implements JobCoordinator {
// Generate the new JobModel
GrouperMetadata grouperMetadata = new
GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
JobModel newJobModel =
- JobModelManager.readJobModel(this.config, Collections.emptyMap(),
streamMetadataCache, grouperMetadata);
+ JobModelCalculator.INSTANCE.calculateJobModel(this.config,
Collections.emptyMap(), streamMetadataCache,
+ grouperMetadata);
LOG.info("pid=" + processorId + "Generated new Job Model. Version = " +
nextJMVersion);
// Publish the new job model
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java
new file mode 100644
index 0000000..09597f0
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelCalculator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SSPGrouperProxy;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory;
+import org.apache.samza.container.grouper.task.TaskNameGrouperProxy;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionMatcher;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+public class JobModelCalculator {
+ private static final Logger LOG =
LoggerFactory.getLogger(JobModelCalculator.class);
+ public static final JobModelCalculator INSTANCE = new JobModelCalculator();
+
+ private JobModelCalculator() {
+ }
+
+ /**
+ * Does the following:
+ * 1. Fetches metadata of the input streams defined in configuration through
{@code streamMetadataCache}.
+ * 2. Applies the SSP grouper and task name grouper defined in the
configuration to build the {@link JobModel}.
+ * @param originalConfig the configuration of the job.
+ * @param changeLogPartitionMapping the task to changelog partition mapping
of the job.
+ * @param streamMetadataCache the cache that holds the partition metadata of
the input streams.
+ * @param grouperMetadata provides the historical metadata of the
application.
+ * @return the built {@link JobModel}.
+ */
+ public JobModel calculateJobModel(Config originalConfig, Map<TaskName,
Integer> changeLogPartitionMapping,
+ StreamMetadataCache streamMetadataCache, GrouperMetadata
grouperMetadata) {
+ // refresh config if enabled regex topic rewriter
+ Config refreshedConfig = refreshConfigByRegexTopicRewriter(originalConfig);
+
+ TaskConfig taskConfig = new TaskConfig(refreshedConfig);
+ // Do grouping to fetch TaskName to SSP mapping
+ Set<SystemStreamPartition> allSystemStreamPartitions =
+ getMatchedInputStreamPartitions(refreshedConfig, streamMetadataCache);
+
+ // processor list is required by some of the groupers. So, let's pass them
as part of the config.
+ // Copy the config and add the processor list to the config copy.
+ // TODO: It is non-ideal to have config as a medium to transmit the
locality information; especially, if the locality information evolves. Evaluate
options on using context objects to pass dependent components.
+ Map<String, String> configMap = new HashMap<>(refreshedConfig);
+ configMap.put(JobConfig.PROCESSOR_LIST, String.join(",",
grouperMetadata.getProcessorLocality().keySet()));
+ SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper(new
MapConfig(configMap));
+
+ JobConfig jobConfig = new JobConfig(refreshedConfig);
+
+ Map<TaskName, Set<SystemStreamPartition>> groups;
+ if (jobConfig.isSSPGrouperProxyEnabled()) {
+ SSPGrouperProxy sspGrouperProxy = new SSPGrouperProxy(refreshedConfig,
grouper);
+ groups = sspGrouperProxy.group(allSystemStreamPartitions,
grouperMetadata);
+ } else {
+ LOG.warn(String.format(
+ "SSPGrouperProxy is disabled (%s = false). Stateful jobs may produce
erroneous results if this is not enabled.",
+ JobConfig.SSP_INPUT_EXPANSION_ENABLED));
+ groups = grouper.group(allSystemStreamPartitions);
+ }
+ LOG.info(String.format(
+ "SystemStreamPartitionGrouper %s has grouped the
SystemStreamPartitions into %d tasks with the following taskNames: %s",
+ grouper, groups.size(), groups));
+
+ // If no mappings are present (first time the job is running) we return
-1, this will allow 0 to be the first change
+ // mapping.
+ int maxChangelogPartitionId =
changeLogPartitionMapping.values().stream().max(Comparator.naturalOrder()).orElse(-1);
+ // Sort the groups prior to assigning the changelog mapping so that the
mapping is reproducible and intuitive
+ TreeMap<TaskName, Set<SystemStreamPartition>> sortedGroups = new
TreeMap<>(groups);
+ Set<TaskModel> taskModels = new HashSet<>();
+ for (Map.Entry<TaskName, Set<SystemStreamPartition>> group :
sortedGroups.entrySet()) {
+ TaskName taskName = group.getKey();
+ Set<SystemStreamPartition> systemStreamPartitions = group.getValue();
+ Optional<Integer> changelogPartitionId =
Optional.ofNullable(changeLogPartitionMapping.get(taskName));
+ Partition changelogPartition;
+ if (changelogPartitionId.isPresent()) {
+ changelogPartition = new Partition(changelogPartitionId.get());
+ } else {
+ // If we've never seen this TaskName before, then assign it a new
changelog partition.
+ maxChangelogPartitionId++;
+ LOG.info(
+ String.format("New task %s is being assigned changelog partition
%s.", taskName, maxChangelogPartitionId));
+ changelogPartition = new Partition(maxChangelogPartitionId);
+ }
+ taskModels.add(new TaskModel(taskName, systemStreamPartitions,
changelogPartition));
+ }
+
+ // Here is where we should put in a pluggable option for the
SSPTaskNameGrouper for locality, load-balancing, etc.
+ TaskNameGrouperFactory containerGrouperFactory =
+ ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory(),
TaskNameGrouperFactory.class);
+ boolean standbyTasksEnabled = jobConfig.getStandbyTasksEnabled();
+ int standbyTaskReplicationFactor =
jobConfig.getStandbyTaskReplicationFactor();
+ TaskNameGrouperProxy taskNameGrouperProxy =
+ new
TaskNameGrouperProxy(containerGrouperFactory.build(refreshedConfig),
standbyTasksEnabled,
+ standbyTaskReplicationFactor);
+ Set<ContainerModel> containerModels;
+ boolean isHostAffinityEnabled = new
ClusterManagerConfig(refreshedConfig).getHostAffinityEnabled();
+ if (isHostAffinityEnabled) {
+ containerModels = taskNameGrouperProxy.group(taskModels,
grouperMetadata);
+ } else {
+ containerModels =
+ taskNameGrouperProxy.group(taskModels, new
ArrayList<>(grouperMetadata.getProcessorLocality().keySet()));
+ }
+
+ Map<String, ContainerModel> containerMap =
+
containerModels.stream().collect(Collectors.toMap(ContainerModel::getId,
Function.identity()));
+ return new JobModel(refreshedConfig, containerMap);
+ }
+
+ /**
+ * Refresh Kafka topic list used as input streams if enabled {@link
org.apache.samza.config.RegExTopicGenerator}
+ * @param originalConfig Samza job config
+ * @return refreshed config
+ */
+ private static Config refreshConfigByRegexTopicRewriter(Config
originalConfig) {
+ JobConfig jobConfig = new JobConfig(originalConfig);
+ Optional<String> configRewriters = jobConfig.getConfigRewriters();
+ Config resultConfig = originalConfig;
+ if (configRewriters.isPresent()) {
+ for (String rewriterName : configRewriters.get().split(",")) {
+ String rewriterClass = jobConfig.getConfigRewriterClass(rewriterName)
+ .orElseThrow(() -> new ConfigException(
+ String.format("Unable to find class config for config rewriter
%s.", rewriterName)));
+ if
(rewriterClass.equalsIgnoreCase(RegExTopicGenerator.class.getName())) {
+ resultConfig = ConfigUtil.applyRewriter(resultConfig, rewriterName);
+ }
+ }
+ }
+ return resultConfig;
+ }
+
+ /**
+ * Builds the input {@see SystemStreamPartition} based upon the {@param
config} defined by the user.
+ * @param config configuration to fetch the metadata of the input streams.
+ * @param streamMetadataCache required to query the partition metadata of
the input streams.
+ * @return the input SystemStreamPartitions of the job.
+ */
+ private static Set<SystemStreamPartition>
getMatchedInputStreamPartitions(Config config,
+ StreamMetadataCache streamMetadataCache) {
+ Set<SystemStreamPartition> allSystemStreamPartitions =
getInputStreamPartitions(config, streamMetadataCache);
+ JobConfig jobConfig = new JobConfig(config);
+ Optional<String> sspMatcherClassName = jobConfig.getSSPMatcherClass();
+ if (sspMatcherClassName.isPresent()) {
+ String sspMatcherConfigJobFactoryRegex =
jobConfig.getSSPMatcherConfigJobFactoryRegex();
+ Optional<String> streamJobFactoryClass =
jobConfig.getStreamJobFactoryClass();
+ if (streamJobFactoryClass.isPresent() &&
Pattern.matches(sspMatcherConfigJobFactoryRegex,
+ streamJobFactoryClass.get())) {
+ LOG.info(String.format("before match: allSystemStreamPartitions.size =
%s", allSystemStreamPartitions.size()));
+ SystemStreamPartitionMatcher sspMatcher =
+ ReflectionUtil.getObj(sspMatcherClassName.get(),
SystemStreamPartitionMatcher.class);
+ Set<SystemStreamPartition> matchedPartitions =
sspMatcher.filter(allSystemStreamPartitions, config);
+ // Usually a small set hence ok to log at info level
+ LOG.info(String.format("after match: matchedPartitions = %s",
matchedPartitions));
+ return matchedPartitions;
+ }
+ }
+ return allSystemStreamPartitions;
+ }
+
+ /**
+ * Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param
config}. Instantiates the
+ * {@see SystemStreamPartitionGrouper} object through the factory.
+ * @param config the configuration of the samza job.
+ * @return the instantiated {@see SystemStreamPartitionGrouper}.
+ */
+ private static SystemStreamPartitionGrouper
getSystemStreamPartitionGrouper(Config config) {
+ String factoryString = new
JobConfig(config).getSystemStreamPartitionGrouperFactory();
+ SystemStreamPartitionGrouperFactory factory =
+ ReflectionUtil.getObj(factoryString,
SystemStreamPartitionGrouperFactory.class);
+ return factory.getSystemStreamPartitionGrouper(config);
+ }
+
+ /**
+ * Computes the input system stream partitions of a samza job using the
provided {@param config}
+ * and {@param streamMetadataCache}.
+ * @param config the configuration of the job.
+ * @param streamMetadataCache to query the partition metadata of the input
streams.
+ * @return the input {@see SystemStreamPartition} of the samza job.
+ */
+ private static Set<SystemStreamPartition> getInputStreamPartitions(Config
config,
+ StreamMetadataCache streamMetadataCache) {
+ TaskConfig taskConfig = new TaskConfig(config);
+ // Get the set of partitions for each SystemStream from the stream metadata
+ Map<SystemStream, SystemStreamMetadata> allMetadata =
JavaConverters.mapAsJavaMapConverter(
+ streamMetadataCache.getStreamMetadata(
+
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams()).asScala().toSet(),
true)).asJava();
+ Set<SystemStreamPartition> inputStreamPartitions = new HashSet<>();
+ allMetadata.forEach((systemStream, systemStreamMetadata) ->
systemStreamMetadata.getSystemStreamPartitionMetadata()
+ .keySet()
+ .forEach(partition -> inputStreamPartitions.add(new
SystemStreamPartition(systemStream, partition))));
+ return inputStreamPartitions;
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
new file mode 100644
index 0000000..c122c37
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobModelHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JobModelHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(JobModelHelper.class);
+
+ private final LocalityManager localityManager;
+ private final TaskAssignmentManager taskAssignmentManager;
+ private final TaskPartitionAssignmentManager taskPartitionAssignmentManager;
+ private final StreamMetadataCache streamMetadataCache;
+ private final JobModelCalculator jobModelCalculator;
+
+ public JobModelHelper(LocalityManager localityManager, TaskAssignmentManager
taskAssignmentManager,
+ TaskPartitionAssignmentManager taskPartitionAssignmentManager,
StreamMetadataCache streamMetadataCache,
+ JobModelCalculator jobModelCalculator) {
+ this.localityManager = localityManager;
+ this.taskAssignmentManager = taskAssignmentManager;
+ this.taskPartitionAssignmentManager = taskPartitionAssignmentManager;
+ this.streamMetadataCache = streamMetadataCache;
+ this.jobModelCalculator = jobModelCalculator;
+ }
+
+ public JobModel newJobModel(Config config, Map<TaskName, Integer>
changelogPartitionMapping) {
+ GrouperMetadata grouperMetadata = getGrouperMetadata(config,
this.localityManager, this.taskAssignmentManager,
+ this.taskPartitionAssignmentManager);
+ JobModel jobModel =
+ this.jobModelCalculator.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ grouperMetadata);
+ updateTaskAssignments(jobModel, this.taskAssignmentManager,
this.taskPartitionAssignmentManager, grouperMetadata);
+ return jobModel;
+ }
+
+ private GrouperMetadata getGrouperMetadata(Config config, LocalityManager
localityManager,
+ TaskAssignmentManager taskAssignmentManager,
TaskPartitionAssignmentManager taskPartitionAssignmentManager) {
+ Map<String, LocationId> processorLocality = getProcessorLocality(config,
localityManager);
+ Map<TaskName, TaskMode> taskModes = taskAssignmentManager.readTaskModes();
+
+ Map<TaskName, String> taskNameToProcessorId = new HashMap<>();
+ Map<TaskName, LocationId> taskLocality = new HashMap<>();
+ // We read the taskAssignment only for ActiveTasks, i.e., tasks that have
no task-mode or have an active task mode
+ taskAssignmentManager.readTaskAssignment().forEach((taskNameString,
containerId) -> {
+ TaskName taskName = new TaskName(taskNameString);
+ if (isActiveTask(taskName, taskModes)) {
+ taskNameToProcessorId.put(taskName, containerId);
+ if (processorLocality.containsKey(containerId)) {
+ taskLocality.put(taskName, processorLocality.get(containerId));
+ }
+ }
+ });
+
+ Map<SystemStreamPartition, List<String>> sspToTaskMapping =
+ taskPartitionAssignmentManager.readTaskPartitionAssignments();
+ Map<TaskName, List<SystemStreamPartition>> taskPartitionAssignments = new
HashMap<>();
+ // Task to partition assignments is stored as {@see SystemStreamPartition}
to list of {@see TaskName} in
+ // coordinator stream. This is done due to the 1 MB value size limit in a
kafka topic. Conversion to
+ // taskName to SystemStreamPartitions is done here to wire-in the data to
{@see JobModel}.
+ sspToTaskMapping.forEach((systemStreamPartition, taskNames) ->
taskNames.forEach(taskNameString -> {
+ TaskName taskName = new TaskName(taskNameString);
+ if (isActiveTask(taskName, taskModes)) {
+ taskPartitionAssignments.putIfAbsent(taskName, new ArrayList<>());
+ taskPartitionAssignments.get(taskName).add(systemStreamPartition);
+ }
+ }));
+ return new GrouperMetadataImpl(processorLocality, taskLocality,
taskPartitionAssignments, taskNameToProcessorId);
+ }
+
+ /**
+ * Retrieves and returns the processor locality of a samza job using
provided {@see Config} and {@see LocalityManager}.
+ * @param config provides the configurations defined by the user. Required
to connect to the storage layer.
+ * @param localityManager provides the processor to host mapping persisted
to the metadata store.
+ * @return the processor locality.
+ */
+ private static Map<String, LocationId> getProcessorLocality(Config config,
LocalityManager localityManager) {
+ Map<String, LocationId> containerToLocationId = new HashMap<>();
+ Map<String, ProcessorLocality> existingContainerLocality =
localityManager.readLocality().getProcessorLocalities();
+
+ for (int i = 0; i < new JobConfig(config).getContainerCount(); i++) {
+ String containerId = Integer.toString(i);
+ LocationId locationId =
Optional.ofNullable(existingContainerLocality.get(containerId))
+ .map(ProcessorLocality::host)
+ .filter(StringUtils::isNotEmpty)
+ .map(LocationId::new)
+ // To handle the case when the container count is increased between
two different runs of a samza-yarn job,
+ // set the locality of newly added containers to any_host.
+ .orElse(new LocationId("ANY_HOST"));
+ containerToLocationId.put(containerId, locationId);
+ }
+ return containerToLocationId;
+ }
+
+ /**
+ * This method does the following:
+ * 1. Deletes the existing task assignments if the partition-task grouping
has changed from the previous run of the job.
+ * 2. Saves the newly generated task assignments to the storage layer
through the {@param TaskAssignementManager}.
+ *
+ * @param jobModel represents the {@see JobModel} of the samza
job.
+ * @param taskAssignmentManager required to persist the processor to task
assignments to the metadata store.
+ * @param taskPartitionAssignmentManager required to persist the task to
partition assignments to the metadata store.
+ * @param grouperMetadata provides the historical metadata of the
samza application.
+ */
+ private void updateTaskAssignments(JobModel jobModel, TaskAssignmentManager
taskAssignmentManager,
+ TaskPartitionAssignmentManager taskPartitionAssignmentManager,
GrouperMetadata grouperMetadata) {
+ LOG.info("Storing the task assignments into metadata store.");
+ Set<String> activeTaskNames = new HashSet<>();
+ Set<String> standbyTaskNames = new HashSet<>();
+ Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>();
+
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ if (TaskMode.Active.equals(taskModel.getTaskMode())) {
+ activeTaskNames.add(taskModel.getTaskName().getTaskName());
+ }
+ if (TaskMode.Standby.equals(taskModel.getTaskMode())) {
+ standbyTaskNames.add(taskModel.getTaskName().getTaskName());
+ }
+ systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions());
+ }
+ }
+
+ Map<TaskName, String> previousTaskToContainerId =
grouperMetadata.getPreviousTaskToProcessorAssignment();
+ if (activeTaskNames.size() != previousTaskToContainerId.size()) {
+ LOG.warn(String.format(
+ "Current task count %s does not match saved task count %s. Stateful
jobs may observe misalignment of keys!",
+ activeTaskNames.size(), previousTaskToContainerId.size()));
+ // If the tasks changed, then the partition-task grouping is also likely
changed and we can't handle that
+ // without a much more complicated mapping. Further, the partition count
may have changed, which means
+ // input message keys are likely reshuffled w.r.t. partitions, so the
local state may not contain necessary
+ // data associated with the incoming keys. Warn the user and default to
grouper
+ // In this scenario the tasks may have been reduced, so we need to
delete all the existing messages
+ taskAssignmentManager.deleteTaskContainerMappings(
+
previousTaskToContainerId.keySet().stream().map(TaskName::getTaskName).collect(Collectors.toList()));
+ taskPartitionAssignmentManager.delete(systemStreamPartitions);
+ }
+
+ // if the set of standby tasks has changed, e.g., when the
replication-factor changed, or the active-tasks-set has
+ // changed, we log a warning and delete the existing mapping for these
tasks
+ Set<String> previousStandbyTasks = taskAssignmentManager.readTaskModes()
+ .entrySet()
+ .stream()
+ .filter(taskNameToTaskModeEntry ->
TaskMode.Standby.equals(taskNameToTaskModeEntry.getValue()))
+ .map(taskNameToTaskModeEntry ->
taskNameToTaskModeEntry.getKey().getTaskName())
+ .collect(Collectors.toSet());
+ if (!standbyTaskNames.equals(previousStandbyTasks)) {
+ LOG.info(
+ String.format("The set of standby tasks has changed, current standby
tasks %s, previous standby tasks %s",
+ standbyTaskNames, previousStandbyTasks));
+ taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks);
+ }
+
+ // Task to partition assignments is stored as {@see SystemStreamPartition}
to list of {@see TaskName} in
+ // coordinator stream. This is done due to the 1 MB value size limit in a
kafka topic.
+ Map<SystemStreamPartition, List<String>> sspToTaskNameMap = new
HashMap<>();
+ Map<String, Map<String, TaskMode>> taskContainerMappings = new HashMap<>();
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
+ taskContainerMappings.putIfAbsent(containerModel.getId(), new
HashMap<>());
+
taskContainerMappings.get(containerModel.getId()).put(taskName.getTaskName(),
taskModel.getTaskMode());
+ taskModel.getSystemStreamPartitions().forEach(systemStreamPartition ->
{
+ sspToTaskNameMap.putIfAbsent(systemStreamPartition, new
ArrayList<>());
+
sspToTaskNameMap.get(systemStreamPartition).add(taskName.getTaskName());
+ });
+ });
+ }
+ taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);
+
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
+ }
+
+ private static boolean isActiveTask(TaskName taskName, Map<TaskName,
TaskMode> taskModes) {
+ return !taskModes.containsKey(taskName) ||
TaskMode.Active.equals(taskModes.get(taskName));
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 7bf29fe..a340bef 100644
---
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -26,7 +26,7 @@ import
org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelCalculator;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
@@ -130,7 +130,8 @@ public class PassthroughJobCoordinator implements
JobCoordinator {
GrouperMetadata grouperMetadata =
new GrouperMetadataImpl(ImmutableMap.of(String.valueOf(containerId),
locationId), Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
- return JobModelManager.readJobModel(this.config, Collections.emptyMap(),
streamMetadataCache, grouperMetadata);
+ return JobModelCalculator.INSTANCE.calculateJobModel(this.config,
Collections.emptyMap(), streamMetadataCache,
+ grouperMetadata);
} finally {
systemAdmins.stop();
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index bc24752..17a8d5c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -42,7 +42,7 @@ import
org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelCalculator;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
@@ -396,7 +396,8 @@ public class ZkJobCoordinator implements JobCoordinator {
}
GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion,
processorNodes);
- JobModel model = JobModelManager.readJobModel(config,
changeLogPartitionMap, streamMetadataCache, grouperMetadata);
+ JobModel model = JobModelCalculator.INSTANCE.calculateJobModel(config,
changeLogPartitionMap, streamMetadataCache,
+ grouperMetadata);
return new JobModel(new MapConfig(), model.getContainers());
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5dd662b..25ce582 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -19,39 +19,21 @@
package org.apache.samza.coordinator
-import java.util
-import java.util.concurrent.atomic.AtomicReference
-
-import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.config._
-import org.apache.samza.config.Config
-import org.apache.samza.container.grouper.stream.SSPGrouperProxy
-import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import org.apache.samza.config.{Config, _}
+import org.apache.samza.container.{LocalityManager, TaskName}
import org.apache.samza.container.grouper.task._
import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
-import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
-import org.apache.samza.container.LocalityManager
-import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.server.{HttpServer, JobServlet,
LocalityServlet}
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.coordinator.stream.messages.{SetContainerHostMapping,
SetTaskContainerMapping, SetTaskModeMapping, SetTaskPartitionMapping}
import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskMode
-import org.apache.samza.job.model.TaskModel
import org.apache.samza.metadatastore.MetadataStore
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.runtime.LocationId
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.system._
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import org.apache.samza.util.{ConfigUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.Logging
-import scala.collection.JavaConverters
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import java.util
+import java.util.concurrent.atomic.AtomicReference
/**
* Helper companion object that is responsible for wiring up a JobModelManager
@@ -91,15 +73,13 @@ object JobModelManager extends Logging {
try {
systemAdmins.start()
val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
- val grouperMetadata: GrouperMetadata = getGrouperMetadata(config,
localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
-
- val jobModel = readJobModel(config, changelogPartitionMapping,
streamMetadataCache, grouperMetadata)
+ val jobModelHelper = new JobModelHelper(localityManager,
taskAssignmentManager, taskPartitionAssignmentManager,
+ streamMetadataCache, JobModelCalculator.INSTANCE)
+ val jobModel = jobModelHelper.newJobModel(config,
changelogPartitionMapping)
val jobModelToServe = new JobModel(jobModel.getConfig,
jobModel.getContainers)
val serializedJobModelToServe =
SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModelToServe)
serializedJobModelRef.set(serializedJobModelToServe)
- updateTaskAssignments(jobModel, taskAssignmentManager,
taskPartitionAssignmentManager, grouperMetadata)
-
val clusterManagerConfig = new ClusterManagerConfig(config)
val server = new HttpServer(port =
clusterManagerConfig.getCoordinatorUrlPort)
server.addServlet("/", new JobServlet(serializedJobModelRef))
@@ -112,322 +92,6 @@ object JobModelManager extends Logging {
// Not closing coordinatorStreamStore, since {@code
ClusterBasedJobCoordinator} uses it to read container locality through {@code
JobModel}.
}
}
-
- /**
- * Builds the {@see GrouperMetadataImpl} for the samza job.
- * @param config represents the configurations defined by the user.
- * @param localityManager provides the processor to host mapping persisted
to the metadata store.
- * @param taskAssignmentManager provides the processor to task assignments
persisted to the metadata store.
- * @param taskPartitionAssignmentManager provides the task to partition
assignments persisted to the metadata store.
- * @return the instantiated {@see GrouperMetadata}.
- */
- def getGrouperMetadata(config: Config, localityManager: LocalityManager,
taskAssignmentManager: TaskAssignmentManager, taskPartitionAssignmentManager:
TaskPartitionAssignmentManager) = {
- val processorLocality: util.Map[String, LocationId] =
getProcessorLocality(config, localityManager)
- val taskModes: util.Map[TaskName, TaskMode] =
taskAssignmentManager.readTaskModes()
-
- // We read the taskAssignment only for ActiveTasks, i.e., tasks that have
no task-mode or have an active task mode
- val taskAssignment: util.Map[String, String] =
taskAssignmentManager.readTaskAssignment().
- filterKeys(taskName => !taskModes.containsKey(new TaskName(taskName)) ||
taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
-
-
- val taskNameToProcessorId: util.Map[TaskName, String] = new
util.HashMap[TaskName, String]()
- for ((taskName, processorId) <- taskAssignment) {
- taskNameToProcessorId.put(new TaskName(taskName), processorId)
- }
-
- val taskLocality: util.Map[TaskName, LocationId] = new
util.HashMap[TaskName, LocationId]()
- for ((taskName, processorId) <- taskAssignment) {
- if (processorLocality.containsKey(processorId)) {
- taskLocality.put(new TaskName(taskName),
processorLocality.get(processorId))
- }
- }
-
- val sspToTaskMapping: util.Map[SystemStreamPartition, util.List[String]] =
taskPartitionAssignmentManager.readTaskPartitionAssignments()
- val taskPartitionAssignments: util.Map[TaskName,
util.List[SystemStreamPartition]] = new util.HashMap[TaskName,
util.List[SystemStreamPartition]]()
-
- // Task to partition assignments is stored as {@see SystemStreamPartition}
to list of {@see TaskName} in
- // coordinator stream. This is done due to the 1 MB value size limit in a
kafka topic. Conversion to
- // taskName to SystemStreamPartitions is done here to wire-in the data to
{@see JobModel}.
- sspToTaskMapping foreach { case (systemStreamPartition:
SystemStreamPartition, taskNames: util.List[String]) =>
- for (task <- taskNames) {
- val taskName: TaskName = new TaskName(task)
-
- // We read the partition assignments only for active-tasks, i.e.,
tasks that have no task-mode or have an active task mode
- if (!taskModes.containsKey(taskName) ||
taskModes.get(taskName).eq(TaskMode.Active)) {
- taskPartitionAssignments.putIfAbsent(taskName, new
util.ArrayList[SystemStreamPartition]())
- taskPartitionAssignments.get(taskName).add(systemStreamPartition)
- }
- }
- }
- new GrouperMetadataImpl(processorLocality, taskLocality,
taskPartitionAssignments, taskNameToProcessorId)
- }
-
- /**
- * Retrieves and returns the processor locality of a samza job using
provided {@see Config} and {@see LocalityManager}.
- * @param config provides the configurations defined by the user. Required
to connect to the storage layer.
- * @param localityManager provides the processor to host mapping persisted
to the metadata store.
- * @return the processor locality.
- */
- def getProcessorLocality(config: Config, localityManager: LocalityManager) =
{
- val containerToLocationId: util.Map[String, LocationId] = new
util.HashMap[String, LocationId]()
- val existingContainerLocality =
localityManager.readLocality().getProcessorLocalities
-
- for (containerId <- 0 until new JobConfig(config).getContainerCount) {
- val preferredHost =
Option.apply(existingContainerLocality.get(containerId.toString))
- .map(containerLocality => containerLocality.host())
- .filter(host => host.nonEmpty)
- .orNull
- // To handle the case when the container count is increased between two
different runs of a samza-yarn job,
- // set the locality of newly added containers to any_host.
- var locationId: LocationId = new LocationId("ANY_HOST")
- if (preferredHost != null) {
- locationId = new LocationId(preferredHost)
- }
- containerToLocationId.put(containerId.toString, locationId)
- }
-
- containerToLocationId
- }
-
- /**
- * This method does the following:
- * 1. Deletes the existing task assignments if the partition-task grouping
has changed from the previous run of the job.
- * 2. Saves the newly generated task assignments to the storage layer
through the {@param TaskAssignementManager}.
- *
- * @param jobModel represents the {@see JobModel} of the samza
job.
- * @param taskAssignmentManager required to persist the processor to task
assignments to the metadata store.
- * @param taskPartitionAssignmentManager required to persist the task to
partition assignments to the metadata store.
- * @param grouperMetadata provides the historical metadata of the
samza application.
- */
- def updateTaskAssignments(jobModel: JobModel,
- taskAssignmentManager: TaskAssignmentManager,
- taskPartitionAssignmentManager:
TaskPartitionAssignmentManager,
- grouperMetadata: GrouperMetadata): Unit = {
- info("Storing the task assignments into metadata store.")
- val activeTaskNames: util.Set[String] = new util.HashSet[String]()
- val standbyTaskNames: util.Set[String] = new util.HashSet[String]()
- val systemStreamPartitions: util.Set[SystemStreamPartition] = new
util.HashSet[SystemStreamPartition]()
- for (container <- jobModel.getContainers.values()) {
- for (taskModel <- container.getTasks.values()) {
- if(taskModel.getTaskMode.eq(TaskMode.Active)) {
- activeTaskNames.add(taskModel.getTaskName.getTaskName)
- }
-
- if(taskModel.getTaskMode.eq(TaskMode.Standby)) {
- standbyTaskNames.add(taskModel.getTaskName.getTaskName)
- }
- systemStreamPartitions.addAll(taskModel.getSystemStreamPartitions)
- }
- }
-
- val previousTaskToContainerId =
grouperMetadata.getPreviousTaskToProcessorAssignment
- if (activeTaskNames.size() != previousTaskToContainerId.size()) {
- warn("Current task count %s does not match saved task count %s. Stateful
jobs may observe misalignment of keys!"
- format (activeTaskNames.size(), previousTaskToContainerId.size()))
- // If the tasks changed, then the partition-task grouping is also likely
changed and we can't handle that
- // without a much more complicated mapping. Further, the partition count
may have changed, which means
- // input message keys are likely reshuffled w.r.t. partitions, so the
local state may not contain necessary
- // data associated with the incoming keys. Warn the user and default to
grouper
- // In this scenario the tasks may have been reduced, so we need to
delete all the existing messages
-
taskAssignmentManager.deleteTaskContainerMappings(previousTaskToContainerId.keys.map(taskName
=> taskName.getTaskName).asJava)
- taskPartitionAssignmentManager.delete(systemStreamPartitions)
- }
-
- // if the set of standby tasks has changed, e.g., when the
replication-factor changed, or the active-tasks-set has
- // changed, we log a warning and delete the existing mapping for these
tasks
- val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x
=> x._2.eq(TaskMode.Standby))
- if(standbyTaskNames.asScala.eq(previousStandbyTasks.keySet)) {
- info("The set of standby tasks has changed, current standby tasks %s,
previous standby tasks %s" format (standbyTaskNames,
previousStandbyTasks.keySet))
-
taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x =>
x._1.getTaskName).asJava)
- }
-
- // Task to partition assignments is stored as {@see SystemStreamPartition}
to list of {@see TaskName} in
- // coordinator stream. This is done due to the 1 MB value size limit in a
kafka topic. Conversion to
- // taskName to SystemStreamPartitions is done here to wire-in the data to
{@see JobModel}.
- val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] =
new util.HashMap[SystemStreamPartition, util.List[String]]()
-
- val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] =
new util.HashMap[String, util.Map[String, TaskMode]]()
-
- for (container <- jobModel.getContainers.values()) {
- for ((taskName, taskModel) <- container.getTasks) {
- taskContainerMappings.putIfAbsent(container.getId, new
util.HashMap[String, TaskMode]())
- taskContainerMappings.get(container.getId).put(taskName.getTaskName,
container.getTasks.get(taskName).getTaskMode)
- for (partition <- taskModel.getSystemStreamPartitions) {
- if (!sspToTaskNameMap.containsKey(partition)) {
- sspToTaskNameMap.put(partition, new util.ArrayList[String]())
- }
- sspToTaskNameMap.get(partition).add(taskName.getTaskName)
- }
- }
- }
-
- taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings)
-
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
- }
-
- /**
- * Computes the input system stream partitions of a samza job using the
provided {@param config}
- * and {@param streamMetadataCache}.
- * @param config the configuration of the job.
- * @param streamMetadataCache to query the partition metadata of the input
streams.
- * @return the input {@see SystemStreamPartition} of the samza job.
- */
- private def getInputStreamPartitions(config: Config, streamMetadataCache:
StreamMetadataCache): Set[SystemStreamPartition] = {
-
- val taskConfig = new TaskConfig(config)
- // Expand regex input, if a regex-rewriter is defined in config
- val inputSystemStreams =
-
JavaConverters.asScalaSetConverter(taskConfig.getInputStreams).asScala.toSet
-
- // Get the set of partitions for each SystemStream from the stream metadata
- streamMetadataCache
- .getStreamMetadata(inputSystemStreams, partitionsMetadataOnly = true)
- .flatMap {
- case (systemStream, metadata) =>
- metadata
- .getSystemStreamPartitionMetadata
- .asScala
- .keys
- .map(new SystemStreamPartition(systemStream, _))
- }.toSet
- }
-
- /**
- * Builds the input {@see SystemStreamPartition} based upon the {@param
config} defined by the user.
- * @param config configuration to fetch the metadata of the input streams.
- * @param streamMetadataCache required to query the partition metadata of
the input streams.
- * @return the input SystemStreamPartitions of the job.
- */
- private def getMatchedInputStreamPartitions(config: Config,
streamMetadataCache: StreamMetadataCache):
- Set[SystemStreamPartition] = {
- val allSystemStreamPartitions = getInputStreamPartitions(config,
streamMetadataCache)
- val jobConfig = new JobConfig(config)
- JavaOptionals.toRichOptional(jobConfig.getSSPMatcherClass).toOption match {
- case Some(sspMatcherClassName) =>
- val jfr = jobConfig.getSSPMatcherConfigJobFactoryRegex.r
-
JavaOptionals.toRichOptional(jobConfig.getStreamJobFactoryClass).toOption match
{
- case Some(jfr(_*)) =>
- info("before match: allSystemStreamPartitions.size = %s" format
allSystemStreamPartitions.size)
- val sspMatcher = ReflectionUtil.getObj(sspMatcherClassName,
classOf[SystemStreamPartitionMatcher])
- val matchedPartitions =
sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet
- // Usually a small set hence ok to log at info level
- info("after match: matchedPartitions = %s" format
matchedPartitions)
- matchedPartitions
- case _ => allSystemStreamPartitions
- }
- case _ => allSystemStreamPartitions
- }
- }
-
- /**
- * Finds the {@see SystemStreamPartitionGrouperFactory} from the {@param
config}. Instantiates the {@see SystemStreamPartitionGrouper}
- * object through the factory.
- * @param config the configuration of the samza job.
- * @return the instantiated {@see SystemStreamPartitionGrouper}.
- */
- private def getSystemStreamPartitionGrouper(config: Config) = {
- val factoryString = new
JobConfig(config).getSystemStreamPartitionGrouperFactory
- val factory = ReflectionUtil.getObj(factoryString,
classOf[SystemStreamPartitionGrouperFactory])
- factory.getSystemStreamPartitionGrouper(config)
- }
-
- /**
- * Refresh Kafka topic list used as input streams if enabled {@link
org.apache.samza.config.RegExTopicGenerator}
- * @param config Samza job config
- * @return refreshed config
- */
- private def refreshConfigByRegexTopicRewriter(config: Config): Config = {
- val jobConfig = new JobConfig(config)
- JavaOptionals.toRichOptional(jobConfig.getConfigRewriters).toOption match {
- case Some(rewriters) => rewriters.split(",").
- filter(rewriterName =>
JavaOptionals.toRichOptional(jobConfig.getConfigRewriterClass(rewriterName)).toOption
- .getOrElse(throw new SamzaException("Unable to find class config for
config rewriter %s." format rewriterName))
- .equalsIgnoreCase(classOf[RegExTopicGenerator].getName)).
- foldLeft(config)(ConfigUtil.applyRewriter(_, _))
- case _ => config
- }
- }
-
- /**
- * Does the following:
- * 1. Fetches metadata of the input streams defined in configuration
through {@param streamMetadataCache}.
- * 2. Applies the {@see SystemStreamPartitionGrouper}, {@see
TaskNameGrouper} defined in the configuration
- * to build the {@see JobModel}.
- * @param originalConfig the configuration of the job.
- * @param changeLogPartitionMapping the task to changelog partition mapping
of the job.
- * @param streamMetadataCache the cache that holds the partition metadata
of the input streams.
- * @param grouperMetadata provides the historical metadata of the
application.
- * @return the built {@see JobModel}.
- */
- def readJobModel(originalConfig: Config,
- changeLogPartitionMapping: util.Map[TaskName, Integer],
- streamMetadataCache: StreamMetadataCache,
- grouperMetadata: GrouperMetadata): JobModel = {
- // refresh config if enabled regex topic rewriter
- val config = refreshConfigByRegexTopicRewriter(originalConfig)
-
- val taskConfig = new TaskConfig(config)
- // Do grouping to fetch TaskName to SSP mapping
- val allSystemStreamPartitions = getMatchedInputStreamPartitions(config,
streamMetadataCache)
-
- // processor list is required by some of the groupers. So, let's pass them
as part of the config.
- // Copy the config and add the processor list to the config copy.
- // TODO: It is non-ideal to have config as a medium to transmit the
locality information; especially, if the locality information evolves. Evaluate
options on using context objects to pass dependent components.
- val configMap = new util.HashMap[String, String](config)
- configMap.put(JobConfig.PROCESSOR_LIST, String.join(",",
grouperMetadata.getProcessorLocality.keySet()))
- val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
-
- val jobConfig = new JobConfig(config)
-
- val groups: util.Map[TaskName, util.Set[SystemStreamPartition]] = if
(jobConfig.isSSPGrouperProxyEnabled) {
- val sspGrouperProxy: SSPGrouperProxy = new SSPGrouperProxy(config,
grouper)
- sspGrouperProxy.group(allSystemStreamPartitions, grouperMetadata)
- } else {
- warn("SSPGrouperProxy is disabled (%s = false). Stateful jobs may
produce erroneous results if this is not enabled." format
JobConfig.SSP_INPUT_EXPANSION_ENABLED)
- grouper.group(allSystemStreamPartitions)
- }
- info("SystemStreamPartitionGrouper %s has grouped the
SystemStreamPartitions into %d tasks with the following taskNames: %s"
format(grouper, groups.size(), groups))
-
- // If no mappings are present(first time the job is running) we return -1,
this will allow 0 to be the first change
- // mapping.
- var maxChangelogPartitionId =
changeLogPartitionMapping.asScala.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
- // Sort the groups prior to assigning the changelog mapping so that the
mapping is reproducible and intuitive
- val sortedGroups = new util.TreeMap[TaskName,
util.Set[SystemStreamPartition]](groups)
-
- // Assign all SystemStreamPartitions to TaskNames.
- val taskModels = {
- sortedGroups.asScala.map { case (taskName, systemStreamPartitions) =>
- val changelogPartition =
Option(changeLogPartitionMapping.get(taskName)) match {
- case Some(changelogPartitionId) => new
Partition(changelogPartitionId)
- case _ =>
- // If we've never seen this TaskName before, then assign it a
- // new changelog.
- maxChangelogPartitionId += 1
- info("New task %s is being assigned changelog partition %s."
format(taskName, maxChangelogPartitionId))
- new Partition(maxChangelogPartitionId)
- }
- new TaskModel(taskName, systemStreamPartitions, changelogPartition)
- }.toSet
- }
-
- // Here is where we should put in a pluggable option for the
- // SSPTaskNameGrouper for locality, load-balancing, etc.
- val containerGrouperFactory =
- ReflectionUtil.getObj(taskConfig.getTaskNameGrouperFactory,
classOf[TaskNameGrouperFactory])
- val standbyTasksEnabled = jobConfig.getStandbyTasksEnabled
- val standbyTaskReplicationFactor =
jobConfig.getStandbyTaskReplicationFactor
- val taskNameGrouperProxy = new
TaskNameGrouperProxy(containerGrouperFactory.build(config),
standbyTasksEnabled, standbyTaskReplicationFactor)
- var containerModels: util.Set[ContainerModel] = null
- val isHostAffinityEnabled = new
ClusterManagerConfig(config).getHostAffinityEnabled
- if(isHostAffinityEnabled) {
- containerModels = taskNameGrouperProxy.group(taskModels, grouperMetadata)
- } else {
- containerModels = taskNameGrouperProxy.group(taskModels, new
util.ArrayList[String](grouperMetadata.getProcessorLocality.keySet()))
- }
-
- val containerMap = containerModels.asScala.map(containerModel =>
containerModel.getId -> containerModel).toMap
- new JobModel(config, containerMap.asJava)
- }
}
/**
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
new file mode 100644
index 0000000..1210d40
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelCalculator.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.RegExTopicGenerator;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GroupByContainerCount;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskNameGrouper;
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionMatcher;
+import org.apache.samza.util.ConfigUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import scala.collection.JavaConverters;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unfortunately, we still need powermock for testing the regex topic flow,
since that flow checks for a specific
+ * rewriter class, and setting up the support for that specific class is too
cumbersome.
+ * Many of the tests do not need powermock.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConfigUtil.class})
+public class TestJobModelCalculator {
+ private static final String REGEX_REWRITER0 = "regexRewriter0";
+ private static final String REGEX_REWRITER1 = "regexRewriter1";
+ private static final SystemStream SYSTEM_STREAM0 = new
SystemStream("system0", "stream0");
+ private static final SystemStream SYSTEM_STREAM1 = new
SystemStream("system1", "stream1");
+
+ @Mock
+ private StreamMetadataCache streamMetadataCache;
+ @Mock
+ private GrouperMetadata grouperMetadata;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ addStreamMetadataCacheMetadata(this.streamMetadataCache,
+ ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4),
SYSTEM_STREAM1, buildSystemStreamMetadata(3)));
+ }
+
+ @Test
+ public void testBasicSingleStream() {
+ addStreamMetadataCacheMetadata(this.streamMetadataCache,
+ ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4)));
+ Map<TaskName, Integer> changeLogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0),
ImmutableMap.of());
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0),
taskName(2), taskModel(2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1),
taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changeLogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testBasicMultipleStreams() {
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
ImmutableMap.of());
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testCustomSSPGrouper() {
+ // custom grouper only groups into two tasks, so only need 2 changelog
partitions
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(2);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+ ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY,
Partition0SeparateFactory.class.getName()));
+ when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+ ImmutableMap.of("0", mock(LocationId.class), "1",
mock(LocationId.class)));
+ Set<SystemStreamPartition> sspsForTask1 = new
ImmutableSet.Builder<SystemStreamPartition>().add(
+ new SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)))
+ .add(new SystemStreamPartition(SYSTEM_STREAM0, new Partition(2)))
+ .add(new SystemStreamPartition(SYSTEM_STREAM0, new Partition(3)))
+ .add(new SystemStreamPartition(SYSTEM_STREAM1, new Partition(1)))
+ .add(new SystemStreamPartition(SYSTEM_STREAM1, new Partition(2)))
+ .build();
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+ new ContainerModel("1",
+ ImmutableMap.of(taskName(1), new TaskModel(taskName(1),
sspsForTask1, new Partition(1)))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testCustomTaskNameGrouper() {
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+ ImmutableMap.of(TaskConfig.GROUPER_FACTORY,
Task0SeparateFactory.class.getName()));
+ when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+ ImmutableMap.of("0", mock(LocationId.class), "1",
mock(LocationId.class)));
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+ new ContainerModel("1",
+ ImmutableMap.of(taskName(1), taskModel(1, 1, 1), taskName(2),
taskModel(2, 2, 2), taskName(3),
+ taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testWithRegexTopicRewriters() {
+ // this is the SystemStream that is directly in the config
+ SystemStream existingSystemStream = new SystemStream("existingSystem",
"existingStream");
+ addStreamMetadataCacheMetadata(this.streamMetadataCache,
+ ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4),
SYSTEM_STREAM1, buildSystemStreamMetadata(3),
+ existingSystemStream, buildSystemStreamMetadata(1)));
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+
+ PowerMockito.mockStatic(ConfigUtil.class);
+ // add SYSTEM_STREAM0 for one rewriter
+ PowerMockito.when(ConfigUtil.applyRewriter(any(), eq(REGEX_REWRITER0)))
+ .thenAnswer(invocation -> addSystemStreamInput(SYSTEM_STREAM0,
invocation.getArgumentAt(0, Config.class)));
+ // add SYSTEM_STREAM1 for another rewriter
+ PowerMockito.when(ConfigUtil.applyRewriter(any(), eq(REGEX_REWRITER1)))
+ .thenAnswer(invocation -> addSystemStreamInput(SYSTEM_STREAM1,
invocation.getArgumentAt(0, Config.class)));
+
+ Config config = config(ImmutableList.of(existingSystemStream),
+ ImmutableMap.of(JobConfig.CONFIG_REWRITERS, String.format("%s,%s",
REGEX_REWRITER0, REGEX_REWRITER1),
+ String.format(JobConfig.CONFIG_REWRITER_CLASS, REGEX_REWRITER0),
RegExTopicGenerator.class.getName(),
+ String.format(JobConfig.CONFIG_REWRITER_CLASS, REGEX_REWRITER1),
RegExTopicGenerator.class.getName()));
+ Set<SystemStreamPartition> sspsForTask0 =
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0)),
+ new SystemStreamPartition(existingSystemStream, new Partition(0)));
+ TaskModel taskModel0 = new TaskModel(taskName(0), sspsForTask0, new
Partition(0));
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel0,
taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ Map<String, String> expectedConfigMap = new HashMap<>(config);
+ expectedConfigMap.put(TaskConfig.INPUT_STREAMS,
+ String.format("%s,%s,%s", taskInputString(existingSystemStream),
taskInputString(SYSTEM_STREAM0),
+ taskInputString(SYSTEM_STREAM1)));
+ JobModel expected = new JobModel(new MapConfig(expectedConfigMap),
containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testWithSSPFilter() {
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+ ImmutableMap.of(JobConfig.SSP_MATCHER_CLASS,
Partition0Or1Filter.class.getName(),
+ JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, ".*MyJobFactory",
+ // this needs to match the regex in the line above
+ JobConfig.STREAM_JOB_FACTORY_CLASS,
"org.apache.samza.custom.MyJobFactory"));
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1, 1))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testSSPMatcherConfigJobFactoryRegexNotMatched() {
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+ ImmutableMap.of(JobConfig.SSP_MATCHER_CLASS,
Partition0Or1Filter.class.getName(),
+ JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, ".*MyJobFactory",
+ // this needs to not match the regex in the line above
+ JobConfig.STREAM_JOB_FACTORY_CLASS,
"org.apache.samza.custom.OtherJobFactory"));
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testNoPreviousTasksAssignsNewChangelogPartitions() {
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
ImmutableMap.of());
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual = JobModelCalculator.INSTANCE.calculateJobModel(config,
ImmutableMap.of(), this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testPreviousChangelogPartitionsMaintained() {
+ // existing changelog mapping has 2 tasks, but the job model ultimately
will need 4 tasks
+ // intentionally using an "out-of-order" changelog mapping to make sure it
gets maintained
+ Map<TaskName, Integer> changelogPartitionMapping =
ImmutableMap.of(taskName(0), 1, taskName(1), 0);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
ImmutableMap.of());
+ // these task models have special changelog partitions from the previous
mapping
+ TaskModel taskModel0 = new TaskModel(taskName(0),
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))), new
Partition(1));
+ TaskModel taskModel1 = new TaskModel(taskName(1),
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(1)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(1))), new
Partition(0));
+ // tasks 2 and 3 will get assigned new changelog partitions
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel0,
taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1,
taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testSSPGrouperProxyUsed() {
+ addStreamMetadataCacheMetadata(this.streamMetadataCache,
+ ImmutableMap.of(SYSTEM_STREAM0, buildSystemStreamMetadata(4)));
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(2);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0),
+ ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY,
Partition0SeparateFactory.class.getName(),
+ // need this to trigger SSPGrouperProxy logic
+ String.format(StorageConfig.FACTORY, "myStore"), "MyCustomStore"));
+ // custom SSP grouper expects a certain processor locality for another
test, so add the locality here too
+ when(this.grouperMetadata.getProcessorLocality()).thenReturn(
+ ImmutableMap.of("0", mock(LocationId.class), "1",
mock(LocationId.class)));
+ /*
+ * Even though the custom grouper factory would normally send the
additional SSPs to task 1, the SSP grouper proxy
+ * should give task 0 some of the SSPs.
+ */
+ when(this.grouperMetadata.getPreviousTaskToSSPAssignment()).thenReturn(
+ ImmutableMap.of(taskName(0), ImmutableList.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0))),
+ taskName(1), ImmutableList.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)))));
+ Set<SystemStreamPartition> sspsForTask0 =
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(0)),
+ new SystemStreamPartition(SYSTEM_STREAM0, new Partition(2)));
+ Set<SystemStreamPartition> sspsForTask1 =
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(1)),
+ new SystemStreamPartition(SYSTEM_STREAM0, new Partition(3)));
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0", new
ContainerModel("0",
+ ImmutableMap.of(taskName(0), new TaskModel(taskName(0),
sspsForTask0, new Partition(0)))), "1",
+ new ContainerModel("1",
+ ImmutableMap.of(taskName(1), new TaskModel(taskName(1),
sspsForTask1, new Partition(1)))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testHostAffinityEnabled() {
+ Map<TaskName, Integer> changelogPartitionMapping =
changelogPartitionMapping(4);
+ Config config = config(ImmutableList.of(SYSTEM_STREAM0, SYSTEM_STREAM1),
+ ImmutableMap.of(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true",
+ // make sure the group method which accepts GrouperMetadata is used
+ TaskConfig.GROUPER_FACTORY,
GroupByContainerCountOverrideFactory.class.getName()));
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ JobModel expected = new JobModel(config, containerModels);
+ JobModel actual =
+ JobModelCalculator.INSTANCE.calculateJobModel(config,
changelogPartitionMapping, this.streamMetadataCache,
+ this.grouperMetadata);
+ assertEquals(expected, actual);
+ }
+
+ private static SystemStreamMetadata buildSystemStreamMetadata(int
numPartitions) {
+ SystemStreamMetadata systemStreamMetadata =
mock(SystemStreamMetadata.class);
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionToMetadata =
+ IntStream.range(0, numPartitions)
+ .boxed()
+ .collect(
+ Collectors.toMap(Partition::new, i ->
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class)));
+
when(systemStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(partitionToMetadata);
+ return systemStreamMetadata;
+ }
+
+ private static TaskName taskName(int id) {
+ return new TaskName("Partition " + id);
+ }
+
+ private static TaskModel taskModel(int id, int partitionForSystemStream0) {
+ return new TaskModel(new TaskName("Partition " + id),
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(partitionForSystemStream0))),
+ new Partition(id));
+ }
+
+ private static TaskModel taskModel(int id, int partitionForSystemStream0,
int partitionForSystemStream1) {
+ return new TaskModel(taskName(id),
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(partitionForSystemStream0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new
Partition(partitionForSystemStream1))), new Partition(id));
+ }
+
+ private static Map<TaskName, Integer> changelogPartitionMapping(int
numPartitions) {
+ return IntStream.range(0, numPartitions)
+ .boxed()
+ .collect(Collectors.toMap(TestJobModelCalculator::taskName,
Function.identity()));
+ }
+
+ private static Config config(List<SystemStream> inputs, Map<String, String>
extraConfigs) {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_CONTAINER_COUNT, "2");
+ configMap.put(TaskConfig.INPUT_STREAMS,
+
inputs.stream().map(TestJobModelCalculator::taskInputString).collect(Collectors.joining(",")));
+ configMap.putAll(extraConfigs);
+ return new MapConfig(configMap);
+ }
+
+ private static String taskInputString(SystemStream systemStream) {
+ return String.format("%s.%s", systemStream.getSystem(),
systemStream.getStream());
+ }
+
+ private static void addStreamMetadataCacheMetadata(StreamMetadataCache
mockStreamMetadataCache,
+ Map<SystemStream, SystemStreamMetadata> systemStreamMetadataMap) {
+ scala.collection.immutable.Set<SystemStream> systemStreams =
+
JavaConverters.asScalaSetConverter(systemStreamMetadataMap.keySet()).asScala().toSet();
+ scala.collection.immutable.Map<SystemStream, SystemStreamMetadata>
scalaSystemStreamMetadataMap =
+ ScalaJavaUtil.toScalaMap(systemStreamMetadataMap);
+ when(mockStreamMetadataCache.getStreamMetadata(systemStreams,
true)).thenReturn(scalaSystemStreamMetadataMap);
+ }
+
+ public static class Partition0SeparateFactory implements
SystemStreamPartitionGrouperFactory {
+ @Override
+ public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config
config) {
+ // check that the "processor.list" gets passed through the config
+ assertEquals(ImmutableSet.of("0", "1"),
ImmutableSet.copyOf(config.get(JobConfig.PROCESSOR_LIST).split(",")));
+ return new Partition0Separate();
+ }
+ }
+
+ /**
+ * Groups all SSPs into two tasks. The first task gets all of the SSPs with
a partition id of 0. The second task gets
+ * all other SSPs. This is used for testing a custom {@link
SystemStreamPartitionGrouper}.
+ */
+ private static class Partition0Separate implements
SystemStreamPartitionGrouper {
+ @Override
+ public Map<TaskName, Set<SystemStreamPartition>>
group(Set<SystemStreamPartition> systemStreamPartitions) {
+ // if partition 0, then add to this first set
+ Set<SystemStreamPartition> sspsForTask0 = new HashSet<>();
+ // all SSPs that are not partition 0 go into this second set
+ Set<SystemStreamPartition> sspsForTask1 = new HashSet<>();
+ systemStreamPartitions.forEach(ssp -> {
+ if (ssp.getPartition().getPartitionId() == 0) {
+ sspsForTask0.add(ssp);
+ } else {
+ sspsForTask1.add(ssp);
+ }
+ });
+ return ImmutableMap.of(taskName(0), sspsForTask0, taskName(1),
sspsForTask1);
+ }
+ }
+
+ public static class Task0SeparateFactory implements TaskNameGrouperFactory {
+ @Override
+ public TaskNameGrouper build(Config config) {
+ return new Task0Separate();
+ }
+ }
+
+ private static class Task0Separate implements TaskNameGrouper {
+ @Override
+ public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String>
containerIds) {
+ // if task 0, then add to this map
+ Map<TaskName, TaskModel> tasksForContainer0 = new HashMap<>();
+ // all tasks that aren't task 0 go into this map
+ Map<TaskName, TaskModel> tasksForContainer1 = new HashMap<>();
+ taskModels.forEach(taskModel -> {
+ if (taskName(0).equals(taskModel.getTaskName())) {
+ tasksForContainer0.put(taskName(0), taskModel);
+ } else {
+ tasksForContainer1.put(taskModel.getTaskName(), taskModel);
+ }
+ });
+ return ImmutableSet.of(new ContainerModel("0", tasksForContainer0), new
ContainerModel("1", tasksForContainer1));
+ }
+ }
+
+ public static class Partition0Or1Filter implements
SystemStreamPartitionMatcher {
+ @Override
+ public Set<SystemStreamPartition> filter(Set<SystemStreamPartition>
systemStreamPartitions, Config config) {
+ return systemStreamPartitions.stream()
+ .filter(ssp -> ssp.getPartition().getPartitionId() <= 1)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ public static class GroupByContainerCountOverrideFactory implements
TaskNameGrouperFactory {
+ @Override
+ public TaskNameGrouper build(Config config) {
+ return new GroupByContainerCountOverride(new
JobConfig(config).getContainerCount());
+ }
+ }
+
+ private static class GroupByContainerCountOverride extends
GroupByContainerCount {
+ public GroupByContainerCountOverride(int containerCount) {
+ super(containerCount);
+ }
+
+ @Override
+ public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String>
containersIds) {
+ throw new UnsupportedOperationException("This should not be called");
+ }
+ }
+
+ private static Config addSystemStreamInput(SystemStream systemStream, Config
config) {
+ String existingInputs = config.get(TaskConfig.INPUT_STREAMS);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(existingInputs));
+ Map<String, String> newConfig = new HashMap<>(config);
+ newConfig.put(TaskConfig.INPUT_STREAMS, String.format("%s,%s",
existingInputs, taskInputString(systemStream)));
+ return new MapConfig(newConfig);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java
new file mode 100644
index 0000000..985561b
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelHelper.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestJobModelHelper {
+ private static final SystemStream SYSTEM_STREAM0 = new
SystemStream("system0", "stream0");
+ private static final SystemStream SYSTEM_STREAM1 = new
SystemStream("system1", "stream1");
+ private static final int NUM_CONTAINERS = 2;
+ private static final String HOST0 = "host-0.abc";
+ private static final String HOST1 = "host-1.abc";
+
+ @Mock
+ private LocalityManager localityManager;
+ @Mock
+ private LocalityModel localityModel;
+ @Mock
+ private TaskAssignmentManager taskAssignmentManager;
+ @Mock
+ private TaskPartitionAssignmentManager taskPartitionAssignmentManager;
+ @Mock
+ private StreamMetadataCache streamMetadataCache;
+ @Mock
+ private JobModelCalculator jobModelCalculator;
+ @Mock
+ private Map<TaskName, Integer> changelogPartitionMapping;
+ @Captor
+ private ArgumentCaptor<GrouperMetadata> grouperMetadataArgumentCaptor;
+
+ private JobModelHelper jobModelHelper;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(this.localityManager.readLocality()).thenReturn(this.localityModel);
+ this.jobModelHelper =
+ new JobModelHelper(this.localityManager, this.taskAssignmentManager,
this.taskPartitionAssignmentManager,
+ this.streamMetadataCache, this.jobModelCalculator);
+ }
+
+ @Test
+ public void testNoPreviousJob() {
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(), ImmutableMap.of(),
ImmutableMap.of(), ImmutableMap.of());
+ verifyNewTaskAssignments(ImmutableSet.of(), allSSPs(containerModels),
containerToTaskToMode(containerModels),
+ sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testSameJobModelAsPrevious() {
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ setupOldTaskAssignments(containerModels);
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(4),
+ activeTaskToSSPs(containerModels),
activeTaskToContainer(containerModels));
+ verifyNewTaskAssignments(null, null,
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testNewContainerWithProcessorLocality() {
+ Map<String, ContainerModel> oldContainerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0, 0))));
+ Map<String, ProcessorLocality> processorLocalities = ImmutableMap.of("0",
processorLocality("0", HOST0));
+
when(this.localityModel.getProcessorLocalities()).thenReturn(processorLocalities);
+ setupOldTaskAssignments(oldContainerModels);
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(processorIdToLocationId(ImmutableMap.of("0", HOST0)),
+ ImmutableMap.of(taskName(0), new LocationId(HOST0)),
activeTaskToSSPs(oldContainerModels),
+ activeTaskToContainer(oldContainerModels));
+ verifyNewTaskAssignments(taskNameStrings(oldContainerModels,
TaskMode.Active), allSSPs(containerModels),
+ containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testAllProcessorLocalityExists() {
+ Map<String, ContainerModel> containerModels = ImmutableMap.of("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0,
0), taskName(2), taskModel(2, 2, 2))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1, 1,
1), taskName(3), taskModel(3, 3))));
+ Map<String, ProcessorLocality> processorLocalities =
+ ImmutableMap.of("0", processorLocality("0", HOST0), "1",
processorLocality("1", HOST1));
+
when(this.localityModel.getProcessorLocalities()).thenReturn(processorLocalities);
+ setupOldTaskAssignments(containerModels);
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(processorIdToLocationId(ImmutableMap.of("0", HOST0,
"1", HOST1)),
+ ImmutableMap.of(taskName(0), new LocationId(HOST0), taskName(1), new
LocationId(HOST1), taskName(2),
+ new LocationId(HOST0), taskName(3), new LocationId(HOST1)),
activeTaskToSSPs(containerModels),
+ activeTaskToContainer(containerModels));
+ verifyNewTaskAssignments(null, null,
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testAddBroadcastInput() {
+ Map<String, ContainerModel> oldContainerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1))));
+ setupOldTaskAssignments(oldContainerModels);
+ TaskModel taskModel0 = taskModel(0, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+ TaskModel taskModel1 = taskModel(1, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel0)), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1)));
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(2),
+ activeTaskToSSPs(oldContainerModels),
activeTaskToContainer(oldContainerModels));
+ verifyNewTaskAssignments(null, null,
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testExistingBroadcastInput() {
+ TaskModel taskModel0 = taskModel(0, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+ TaskModel taskModel1 = taskModel(1, ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(1)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new Partition(0))));
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel0)), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel1)));
+ setupOldTaskAssignments(containerModels);
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(2),
+ activeTaskToSSPs(containerModels),
activeTaskToContainer(containerModels));
+ verifyNewTaskAssignments(null, null,
containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testNewStandbyContainers() {
+ Map<String, ContainerModel> oldContainerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1))));
+ setupOldTaskAssignments(oldContainerModels);
+ Map<String, ContainerModel> containerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1))), "0-0",
+ new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0),
standbyTaskModel(0, 0))), "1-0",
+ new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0),
standbyTaskModel(1, 0))));
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(2),
+ activeTaskToSSPs(oldContainerModels),
activeTaskToContainer(oldContainerModels));
+ // TaskAssignmentManager.deleteTaskContainerMappings is called once due to
change in standby task count
+ verifyNewTaskAssignments(ImmutableSet.of(), null,
containerToTaskToMode(containerModels),
+ sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testExistingStandbyContainers() {
+ Map<String, ContainerModel> oldContainerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1))), "0-0",
+ new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0),
standbyTaskModel(0, 0))), "1-0",
+ new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0),
standbyTaskModel(1, 0))));
+ setupOldTaskAssignments(oldContainerModels);
+ Map<String, ContainerModel> containerModels = new
ImmutableMap.Builder<String, ContainerModel>().put("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0))))
+ .put("1", new ContainerModel("1", ImmutableMap.of(taskName(1),
taskModel(1, 1))))
+ .put("0-0", new ContainerModel("0-0",
ImmutableMap.of(standbyTaskName(0, 0), standbyTaskModel(0, 0))))
+ .put("1-0", new ContainerModel("1-0",
ImmutableMap.of(standbyTaskName(1, 0), standbyTaskModel(1, 0))))
+ .put("0-1", new ContainerModel("0-1",
ImmutableMap.of(standbyTaskName(0, 1), standbyTaskModel(0, 1))))
+ .put("1-1", new ContainerModel("1-1",
ImmutableMap.of(standbyTaskName(1, 1), standbyTaskModel(1, 1))))
+ .build();
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(2),
+ activeTaskToSSPs(oldContainerModels),
activeTaskToContainer(oldContainerModels));
+ verifyNewTaskAssignments(taskNameStrings(oldContainerModels,
TaskMode.Standby), null,
+ containerToTaskToMode(containerModels), sspToTasks(containerModels));
+ }
+
+ @Test
+ public void testNewStandbyTasks() {
+ Map<String, ContainerModel> oldContainerModels =
+ ImmutableMap.of("0", new ContainerModel("0",
ImmutableMap.of(taskName(0), taskModel(0, 0))), "1",
+ new ContainerModel("1", ImmutableMap.of(taskName(1), taskModel(1,
1))), "0-0",
+ new ContainerModel("0-0", ImmutableMap.of(standbyTaskName(0, 0),
standbyTaskModel(0, 0))), "1-0",
+ new ContainerModel("1-0", ImmutableMap.of(standbyTaskName(1, 0),
standbyTaskModel(1, 0))));
+ setupOldTaskAssignments(oldContainerModels);
+ Map<String, ContainerModel> containerModels = new
ImmutableMap.Builder<String, ContainerModel>().put("0",
+ new ContainerModel("0", ImmutableMap.of(taskName(0), taskModel(0, 0))))
+ .put("1", new ContainerModel("1", ImmutableMap.of(taskName(1),
taskModel(1, 1))))
+ .put("2", new ContainerModel("2", ImmutableMap.of(taskName(2),
taskModel(2, 2))))
+ .put("0-0", new ContainerModel("0-0",
ImmutableMap.of(standbyTaskName(0, 0), standbyTaskModel(0, 0))))
+ .put("1-0", new ContainerModel("1-0",
ImmutableMap.of(standbyTaskName(1, 0), standbyTaskModel(1, 0))))
+ .put("2-0", new ContainerModel("2-0",
ImmutableMap.of(standbyTaskName(2, 0), standbyTaskModel(2, 0))))
+ .build();
+ runAndCheckNewJobModel(config(), containerModels);
+ verifyGrouperMetadata(anyHostProcessorIdToLocationId(),
anyHostTaskNameToLocationId(2),
+ activeTaskToSSPs(oldContainerModels),
activeTaskToContainer(oldContainerModels));
+ //noinspection unchecked: ArgumentCaptor doesn't ideally handle multiple
levels of generics, so need cast
+ ArgumentCaptor<Iterable<String>> deleteTaskContainerMappingsCaptor =
+ (ArgumentCaptor<Iterable<String>>) (ArgumentCaptor<?>)
ArgumentCaptor.forClass(Iterable.class);
+ verify(this.taskAssignmentManager, times(2)).deleteTaskContainerMappings(
+ deleteTaskContainerMappingsCaptor.capture());
+ assertEquals(taskNameStrings(oldContainerModels, TaskMode.Active),
+
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getAllValues().get(0)));
+ assertEquals(taskNameStrings(oldContainerModels, TaskMode.Standby),
+
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getAllValues().get(1)));
+
verify(this.taskPartitionAssignmentManager).delete(allSSPs(containerModels));
+ verify(this.taskPartitionAssignmentManager).delete(any());
+
verify(this.taskAssignmentManager).writeTaskContainerMappings(containerToTaskToMode(containerModels));
+ verify(this.taskAssignmentManager).writeTaskContainerMappings(any());
+
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(sspToTasks(containerModels));
+
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(any());
+ }
+
+ private void runAndCheckNewJobModel(Config config, Map<String,
ContainerModel> containerModels) {
+ JobModel jobModel = new JobModel(config, containerModels);
+ when(this.jobModelCalculator.calculateJobModel(eq(config),
eq(this.changelogPartitionMapping),
+ eq(this.streamMetadataCache),
this.grouperMetadataArgumentCaptor.capture())).thenReturn(jobModel);
+ JobModel actualJobModel = this.jobModelHelper.newJobModel(config,
this.changelogPartitionMapping);
+ assertEquals(jobModel, actualJobModel);
+ }
+
+ private void setupOldTaskAssignments(Map<String, ContainerModel>
oldContainerModels) {
+
when(this.taskAssignmentManager.readTaskAssignment()).thenReturn(taskNameStringToContainer(oldContainerModels));
+
when(this.taskAssignmentManager.readTaskModes()).thenReturn(taskNameToMode(oldContainerModels));
+
when(this.taskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(sspToTasks(oldContainerModels));
+ }
+
+ private static Config config() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_CONTAINER_COUNT,
Integer.toString(NUM_CONTAINERS));
+ return new MapConfig(configMap);
+ }
+
+ private static TaskName taskName(int id) {
+ return new TaskName("Partition " + id);
+ }
+
+ private static TaskModel taskModel(int id, int partitionForSystemStream0) {
+ return taskModel(id,
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(partitionForSystemStream0))));
+ }
+
+ private static TaskModel taskModel(int id, int partitionForSystemStream0,
int partitionForSystemStream1) {
+ return taskModel(id,
+ ImmutableSet.of(new SystemStreamPartition(SYSTEM_STREAM0, new
Partition(partitionForSystemStream0)),
+ new SystemStreamPartition(SYSTEM_STREAM1, new
Partition(partitionForSystemStream1))));
+ }
+
+ private static TaskModel taskModel(int id, Set<SystemStreamPartition>
systemStreamPartitions) {
+ return new TaskModel(taskName(id), systemStreamPartitions, new
Partition(id));
+ }
+
+ private static TaskName standbyTaskName(int taskId, int standbyId) {
+ return new TaskName(String.format("Standby-Partition %s-%s", taskId,
standbyId));
+ }
+
+ private static TaskModel standbyTaskModel(int taskId, int standbyId) {
+ TaskName taskName = new TaskName(String.format("Standby-Partition %s-%s",
taskId, standbyId));
+ Set<SystemStreamPartition> ssps = ImmutableSet.of(new
SystemStreamPartition(SYSTEM_STREAM0, new Partition(taskId)));
+ return new TaskModel(taskName, ssps, new Partition(taskId),
TaskMode.Standby);
+ }
+
+ private static Set<SystemStreamPartition> allSSPs(Map<String,
ContainerModel> containerModels) {
+ return taskModelStream(containerModels).flatMap(taskModel ->
taskModel.getSystemStreamPartitions().stream())
+ .collect(Collectors.toSet());
+ }
+
+ private static Map<String, Map<String, TaskMode>>
containerToTaskToMode(Map<String, ContainerModel> containerModels) {
+ return containerModels.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()
+ .getTasks()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(taskModelEntry ->
taskModelEntry.getKey().getTaskName(),
+ taskModelEntry -> taskModelEntry.getValue().getTaskMode()))));
+ }
+
+ private static Stream<TaskModel> taskModelStream(Map<String, ContainerModel>
containerModels) {
+ return containerModels.values().stream().flatMap(containerModel ->
containerModel.getTasks().values().stream());
+ }
+
+ private static Set<TaskName> taskNames(Map<String, ContainerModel>
containerModels, TaskMode taskMode) {
+ return taskModelStream(containerModels).filter(taskModel ->
taskMode.equals(taskModel.getTaskMode()))
+ .map(TaskModel::getTaskName)
+ .collect(Collectors.toSet());
+ }
+
+ private static Set<String> taskNameStrings(Map<String, ContainerModel>
containerModels, TaskMode taskMode) {
+ return taskNames(containerModels,
taskMode).stream().map(TaskName::getTaskName).collect(Collectors.toSet());
+ }
+
+ private static Map<TaskName, List<SystemStreamPartition>> activeTaskToSSPs(
+ Map<String, ContainerModel> containerModels) {
+ return taskModelStream(containerModels).filter(taskModel ->
TaskMode.Active.equals(taskModel.getTaskMode()))
+ .collect(Collectors.toMap(TaskModel::getTaskName,
+ taskModel ->
ImmutableList.copyOf(taskModel.getSystemStreamPartitions())));
+ }
+
+ private static Map<TaskName, TaskMode> taskNameToMode(Map<String,
ContainerModel> containerModels) {
+ return
taskModelStream(containerModels).collect(Collectors.toMap(TaskModel::getTaskName,
TaskModel::getTaskMode));
+ }
+
+ private static Map<TaskName, String> activeTaskToContainer(Map<String,
ContainerModel> containerModels) {
+ Map<TaskName, String> taskToContainerMap = new HashMap<>();
+ containerModels.values()
+ .forEach(containerModel -> containerModel.getTasks()
+ .values()
+ .stream()
+ .filter(taskModel ->
TaskMode.Active.equals(taskModel.getTaskMode()))
+ .forEach(taskModel ->
taskToContainerMap.put(taskModel.getTaskName(), containerModel.getId())));
+ return taskToContainerMap;
+ }
+
+ private static Map<String, String> taskNameStringToContainer(Map<String,
ContainerModel> containerModels) {
+ return activeTaskToContainer(containerModels).entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().getTaskName(),
Map.Entry::getValue));
+ }
+
+ private static Map<SystemStreamPartition, List<String>>
sspToTasks(Map<String, ContainerModel> containerModels) {
+ Map<SystemStreamPartition, List<String>> sspToTasksMap = new HashMap<>();
+ taskModelStream(containerModels).forEach(taskModel ->
taskModel.getSystemStreamPartitions().forEach(ssp -> {
+ sspToTasksMap.putIfAbsent(ssp, new ArrayList<>());
+ sspToTasksMap.get(ssp).add(taskModel.getTaskName().getTaskName());
+ }));
+ return sspToTasksMap;
+ }
+
+ private static Map<String, LocationId> processorIdToLocationId(Map<String,
String> processorIdToHostOverrides) {
+ Map<String, LocationId> processorLocality = new
HashMap<>(anyHostProcessorIdToLocationId());
+ processorIdToHostOverrides.forEach((processorId, host) ->
processorLocality.put(processorId, new LocationId(host)));
+ return processorLocality;
+ }
+
+ private static Map<String, LocationId> anyHostProcessorIdToLocationId() {
+ return IntStream.range(0, NUM_CONTAINERS)
+ .boxed()
+ .collect(Collectors.toMap(i -> Integer.toString(i), i -> new
LocationId("ANY_HOST")));
+ }
+
+ private static Map<TaskName, LocationId> anyHostTaskNameToLocationId(int
numTasks) {
+ return IntStream.range(0, numTasks)
+ .boxed()
+ .collect(Collectors.toMap(TestJobModelHelper::taskName, i -> new
LocationId("ANY_HOST")));
+ }
+
+ private void verifyGrouperMetadata(Map<String, LocationId>
processorLocality, Map<TaskName, LocationId> taskLocality,
+ Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignment,
+ Map<TaskName, String> previousTaskToProcessorAssignment) {
+ GrouperMetadata grouperMetadata =
this.grouperMetadataArgumentCaptor.getValue();
+ assertEquals(processorLocality, grouperMetadata.getProcessorLocality());
+ assertEquals(taskLocality, grouperMetadata.getTaskLocality());
+ assertPreviousTaskToSSPAssignment(previousTaskToSSPAssignment,
grouperMetadata.getPreviousTaskToSSPAssignment());
+ assertEquals(previousTaskToProcessorAssignment,
grouperMetadata.getPreviousTaskToProcessorAssignment());
+ }
+
+ private void verifyNewTaskAssignments(Set<String>
deleteTaskContainerMappings,
+ Set<SystemStreamPartition> taskPartitionAssignmentDeletes,
+ Map<String, Map<String, TaskMode>> writeTaskContainerMappings,
+ Map<SystemStreamPartition, List<String>> writeTaskPartitionAssignments) {
+ if (deleteTaskContainerMappings != null) {
+ //noinspection unchecked: ArgumentCaptor doesn't ideally handle multiple
levels of generics, so need cast
+ ArgumentCaptor<Iterable<String>> deleteTaskContainerMappingsCaptor =
+ (ArgumentCaptor<Iterable<String>>) (ArgumentCaptor<?>)
ArgumentCaptor.forClass(Iterable.class);
+
verify(this.taskAssignmentManager).deleteTaskContainerMappings(deleteTaskContainerMappingsCaptor.capture());
+ // order of Iterable argument shouldn't matter
+ assertEquals(deleteTaskContainerMappings,
ImmutableSet.copyOf(deleteTaskContainerMappingsCaptor.getValue()));
+ } else {
+ verify(this.taskAssignmentManager,
never()).deleteTaskContainerMappings(any());
+ }
+ if (taskPartitionAssignmentDeletes != null) {
+
verify(this.taskPartitionAssignmentManager).delete(taskPartitionAssignmentDeletes);
+ verify(this.taskPartitionAssignmentManager).delete(any());
+ } else {
+ verify(this.taskPartitionAssignmentManager, never()).delete(any());
+ }
+
+
verify(this.taskAssignmentManager).writeTaskContainerMappings(writeTaskContainerMappings);
+ // verifies no calls to writeTaskContainerMappings other than the one above
+ verify(this.taskAssignmentManager).writeTaskContainerMappings(any());
+
+
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(writeTaskPartitionAssignments);
+ // verifies no calls to writeTaskPartitionAssignments other than the one
above
+
verify(this.taskPartitionAssignmentManager).writeTaskPartitionAssignments(any());
+ }
+
+ /**
+ * Order of SSPs in values of the map shouldn't matter, and the order is
also non-deterministic, so just check the
+ * contents of the lists.
+ */
+ private static void assertPreviousTaskToSSPAssignment(Map<TaskName,
List<SystemStreamPartition>> expected,
+ Map<TaskName, List<SystemStreamPartition>> actual) {
+ assertEquals(expected.size(), actual.size());
+ expected.forEach((taskName, sspList) ->
assertContentsEqualNoDuplicates(sspList, actual.get(taskName)));
+ }
+
+ private static <T> void assertContentsEqualNoDuplicates(List<T> expected,
List<T> actual) {
+ Set<T> expectedSet = ImmutableSet.copyOf(expected);
+ Set<T> actualSet = ImmutableSet.copyOf(actual);
+ assertEquals(expectedSet.size(), expected.size());
+ assertEquals(actualSet.size(), actual.size());
+ assertEquals(expectedSet, actualSet);
+ }
+
+ private static ProcessorLocality processorLocality(String id, String host) {
+ ProcessorLocality processorLocality = mock(ProcessorLocality.class);
+ when(processorLocality.id()).thenReturn(id);
+ when(processorLocality.host()).thenReturn(host);
+ return processorLocality;
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
deleted file mode 100644
index 83de0cf..0000000
---
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.*;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.RegExTopicGenerator;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.container.grouper.task.GroupByContainerCount;
-import org.apache.samza.container.grouper.task.GrouperMetadata;
-import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
-import org.apache.samza.container.grouper.task.TaskAssignmentManager;
-import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.ProcessorLocality;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.LocalityModel;
-import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.runtime.LocationId;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.testUtils.MockHttpServer;
-import org.apache.samza.util.ConfigUtil;
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import scala.collection.JavaConversions;
-
-/**
- * Unit tests for {@link JobModelManager}
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class,
ConfigUtil.class})
-public class TestJobModelManager {
- private final TaskAssignmentManager mockTaskManager =
mock(TaskAssignmentManager.class);
- private final Map<String, Map<String, String>> localityMappings = new
HashMap<>();
- private final HttpServer server = new MockHttpServer("/", 7777, null, new
ServletHolder(DefaultServlet.class));
- private final SystemStream inputStream = new SystemStream("test-system",
"test-stream");
- private final SystemStreamMetadata.SystemStreamPartitionMetadata
mockSspMetadata =
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class);
- private final Map<Partition,
SystemStreamMetadata.SystemStreamPartitionMetadata> mockSspMetadataMap =
Collections.singletonMap(new Partition(0), mockSspMetadata);
- private final SystemStreamMetadata mockStreamMetadata =
mock(SystemStreamMetadata.class);
- private final scala.collection.immutable.Map<SystemStream,
SystemStreamMetadata> mockStreamMetadataMap = new
scala.collection.immutable.Map.Map1(inputStream, mockStreamMetadata);
- private final StreamMetadataCache mockStreamMetadataCache =
mock(StreamMetadataCache.class);
- private final scala.collection.immutable.Set<SystemStream> inputStreamSet =
JavaConversions.asScalaSet(Collections.singleton(inputStream)).toSet();
-
- private JobModelManager jobModelManager;
-
- @Before
- public void setup() throws Exception {
- when(mockStreamMetadataCache.getStreamMetadata(argThat(new
ArgumentMatcher<scala.collection.immutable.Set<SystemStream>>() {
- @Override
- public boolean matches(Object argument) {
- scala.collection.immutable.Set<SystemStream> set =
(scala.collection.immutable.Set<SystemStream>) argument;
- return set.equals(inputStreamSet);
- }
- }), anyBoolean())).thenReturn(mockStreamMetadataMap);
-
when(mockStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(mockSspMetadataMap);
-
PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(mockTaskManager);
-
when(mockTaskManager.readTaskAssignment()).thenReturn(Collections.EMPTY_MAP);
- }
-
- @Test
- public void testGetGrouperMetadata() {
- // Mocking setup.
- LocalityManager mockLocalityManager = mock(LocalityManager.class);
- TaskAssignmentManager mockTaskAssignmentManager =
Mockito.mock(TaskAssignmentManager.class);
- TaskPartitionAssignmentManager mockTaskPartitionAssignmentManager =
Mockito.mock(TaskPartitionAssignmentManager.class);
-
- SystemStreamPartition testSystemStreamPartition1 = new
SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new
Partition(1));
- SystemStreamPartition testSystemStreamPartition2 = new
SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new
Partition(2));
-
- when(mockLocalityManager.readLocality()).thenReturn(new
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0",
"abc-affinity"))));
-
- Map<SystemStreamPartition, List<String>> taskToSSPAssignments =
ImmutableMap.of(testSystemStreamPartition1, ImmutableList.of("task-0",
"task-1"),
-
testSystemStreamPartition2, ImmutableList.of("task-2", "task-3"));
-
- Map<String, String> taskAssignment = ImmutableMap.of("task-0", "0");
-
- // Mock the task to partition assignment.
-
when(mockTaskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(taskToSSPAssignments);
-
- // Mock the container to task assignment.
-
when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
-
when(mockTaskAssignmentManager.readTaskModes()).thenReturn(ImmutableMap.of(new
TaskName("task-0"), TaskMode.Active, new TaskName("task-1"), TaskMode.Active,
new TaskName("task-2"), TaskMode.Active, new TaskName("task-3"),
TaskMode.Active));
-
- GrouperMetadataImpl grouperMetadata =
JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager,
mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);
-
- verify(mockLocalityManager).readLocality();
- verify(mockTaskAssignmentManager).readTaskAssignment();
-
- Assert.assertEquals(ImmutableMap.of("0", new LocationId("abc-affinity")),
grouperMetadata.getProcessorLocality());
- Assert.assertEquals(ImmutableMap.of(new TaskName("task-0"), new
LocationId("abc-affinity")), grouperMetadata.getTaskLocality());
-
- Map<TaskName, List<SystemStreamPartition>> expectedTaskToSSPAssignments =
ImmutableMap.of(new TaskName("task-0"),
ImmutableList.of(testSystemStreamPartition1),
-
new TaskName("task-1"),
ImmutableList.of(testSystemStreamPartition1),
-
new TaskName("task-2"),
ImmutableList.of(testSystemStreamPartition2),
-
new TaskName("task-3"),
ImmutableList.of(testSystemStreamPartition2));
- Assert.assertEquals(expectedTaskToSSPAssignments,
grouperMetadata.getPreviousTaskToSSPAssignment());
- }
-
- @Test
- public void testGetProcessorLocalityAllEntriesExisting() {
- Config config = new
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
-
- LocalityManager mockLocalityManager = mock(LocalityManager.class);
- when(mockLocalityManager.readLocality()).thenReturn(new
LocalityModel(ImmutableMap.of(
- "0", new ProcessorLocality("0", "0-affinity"),
- "1", new ProcessorLocality("1", "1-affinity"))));
-
- Map<String, LocationId> processorLocality =
JobModelManager.getProcessorLocality(config, mockLocalityManager);
-
- verify(mockLocalityManager).readLocality();
- ImmutableMap<String, LocationId> expected =
- ImmutableMap.of("0", new LocationId("0-affinity"), "1", new
LocationId("1-affinity"));
- Assert.assertEquals(expected, processorLocality);
- }
-
- @Test
- public void testGetProcessorLocalityNewContainer() {
- Config config = new
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
-
- LocalityManager mockLocalityManager = mock(LocalityManager.class);
- // 2 containers, but only return 1 existing mapping
- when(mockLocalityManager.readLocality()).thenReturn(new
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0",
"abc-affinity"))));
-
- Map<String, LocationId> processorLocality =
JobModelManager.getProcessorLocality(config, mockLocalityManager);
-
- verify(mockLocalityManager).readLocality();
- ImmutableMap<String, LocationId> expected = ImmutableMap.of(
- // found entry in existing locality
- "0", new LocationId("abc-affinity"),
- // no entry in existing locality
- "1", new LocationId("ANY_HOST"));
- Assert.assertEquals(expected, processorLocality);
- }
-
- @Test
- public void testUpdateTaskAssignments() {
- // Mocking setup.
- JobModel mockJobModel = Mockito.mock(JobModel.class);
- GrouperMetadataImpl mockGrouperMetadata =
Mockito.mock(GrouperMetadataImpl.class);
- TaskAssignmentManager mockTaskAssignmentManager =
Mockito.mock(TaskAssignmentManager.class);
- TaskPartitionAssignmentManager mockTaskPartitionAssignmentManager =
Mockito.mock(TaskPartitionAssignmentManager.class);
-
- SystemStreamPartition testSystemStreamPartition1 = new
SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new
Partition(1));
- SystemStreamPartition testSystemStreamPartition2 = new
SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new
Partition(2));
- SystemStreamPartition testSystemStreamPartition3 = new
SystemStreamPartition(new SystemStream("test-system-2", "test-stream-2"), new
Partition(1));
- SystemStreamPartition testSystemStreamPartition4 = new
SystemStreamPartition(new SystemStream("test-system-3", "test-stream-3"), new
Partition(2));
-
- Map<TaskName, TaskModel> taskModelMap = new HashMap<>();
- taskModelMap.put(new TaskName("task-1"), new TaskModel(new
TaskName("task-1"), ImmutableSet.of(testSystemStreamPartition1), new
Partition(0)));
- taskModelMap.put(new TaskName("task-2"), new TaskModel(new
TaskName("task-2"), ImmutableSet.of(testSystemStreamPartition2), new
Partition(1)));
- taskModelMap.put(new TaskName("task-3"), new TaskModel(new
TaskName("task-3"), ImmutableSet.of(testSystemStreamPartition3), new
Partition(2)));
- taskModelMap.put(new TaskName("task-4"), new TaskModel(new
TaskName("task-4"), ImmutableSet.of(testSystemStreamPartition4), new
Partition(3)));
- ContainerModel containerModel = new ContainerModel("test-container-id",
taskModelMap);
- Map<String, ContainerModel> containerMapping =
ImmutableMap.of("test-container-id", containerModel);
-
- when(mockJobModel.getContainers()).thenReturn(containerMapping);
-
when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new
HashMap<>());
-
Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMappings(Mockito.any());
-
- JobModelManager.updateTaskAssignments(mockJobModel,
mockTaskAssignmentManager, mockTaskPartitionAssignmentManager,
mockGrouperMetadata);
-
- Set<String> taskNames = new HashSet<String>();
- taskNames.add("task-4");
- taskNames.add("task-2");
- taskNames.add("task-3");
- taskNames.add("task-1");
-
- Set<SystemStreamPartition> systemStreamPartitions = new HashSet<>();
- systemStreamPartitions.add(new SystemStreamPartition(new
SystemStream("test-system-0", "test-stream-0"), new Partition(1)));
- systemStreamPartitions.add(new SystemStreamPartition(new
SystemStream("test-system-1", "test-stream-1"), new Partition(2)));
- systemStreamPartitions.add(new SystemStreamPartition(new
SystemStream("test-system-2", "test-stream-2"), new Partition(1)));
- systemStreamPartitions.add(new SystemStreamPartition(new
SystemStream("test-system-3", "test-stream-3"), new Partition(2)));
-
- // Verifications
- verify(mockJobModel, atLeast(1)).getContainers();
-
verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-
verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
- ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active,
"task-3", TaskMode.Active, "task-4", TaskMode.Active)));
-
- // Verify that the old, stale partition mappings had been purged in the
coordinator stream.
- verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
-
- // Verify that the new task to partition assignment is stored in the
coordinator stream.
-
verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
- testSystemStreamPartition1, ImmutableList.of("task-1"),
- testSystemStreamPartition2, ImmutableList.of("task-2"),
- testSystemStreamPartition3, ImmutableList.of("task-3"),
- testSystemStreamPartition4, ImmutableList.of("task-4")
- ));
- }
-
- @Test
- public void
testJobModelContainsLatestTaskInputsWhenEnabledRegexTopicRewriter() {
- ImmutableMap<String, String> rewriterConfig = ImmutableMap.of(
- JobConfig.CONFIG_REWRITERS, "regexTopicRewriter",
- String.format(JobConfig.CONFIG_REWRITER_CLASS, "regexTopicRewriter"),
RegExTopicGenerator.class.getCanonicalName()
- );
-
- Config config = new MapConfig(rewriterConfig);
- String taskInputMatchedRegex = inputStream.getSystem() + "." +
inputStream.getStream();
- Config refreshedConfig = new MapConfig(ImmutableMap.<String,
String>builder()
- .putAll(rewriterConfig)
- .put(TaskConfig.INPUT_STREAMS, taskInputMatchedRegex)
- .build()
- );
-
- PowerMockito.mockStatic(ConfigUtil.class);
- PowerMockito.when(ConfigUtil.applyRewriter(config,
"regexTopicRewriter")).thenReturn(refreshedConfig);
-
- Map<TaskName, Integer> changeLogPartitionMapping = new HashMap<>();
- GrouperMetadata grouperMetadata = mock(GrouperMetadata.class);
-
- JobModel jobModel =
- JobModelManager.readJobModel(config, changeLogPartitionMapping,
mockStreamMetadataCache, grouperMetadata);
-
- Assert.assertNotNull(jobModel);
- Assert.assertEquals(taskInputMatchedRegex,
jobModel.getConfig().get(TaskConfig.INPUT_STREAMS));
- }
-}
diff --git
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
similarity index 89%
rename from
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
rename to
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
index 123872b..95856fa 100644
---
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobModelManager.scala
@@ -19,8 +19,6 @@
package org.apache.samza.coordinator
-import java.util
-
import org.apache.samza.Partition
import
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
import org.apache.samza.config._
@@ -28,25 +26,25 @@ import org.apache.samza.container.{SamzaContainer, TaskName}
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import
org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory,
MockCoordinatorStreamWrappedConsumer}
-import org.apache.samza.job.MockJobFactory
import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.storage.ChangelogStreamManager
-import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
import org.apache.samza.util.HttpUtil
import org.junit.Assert._
import org.junit.{After, Before, Test}
-import org.mockito.Mockito.{mock, when}
import org.scalatest.{FlatSpec, PrivateMethodTester}
+import java.util
import scala.collection.JavaConverters._
-import scala.collection.immutable
-
-class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
+/**
+ * This used to be called TestJobCoordinator, but JobCoordinator is currently
an interface, so that old name seemed
+ * incorrect. The object-under-test seems to actually be JobModelManager.
+ */
+class TestJobModelManager extends FlatSpec with PrivateMethodTester {
/**
* Builds a coordinator from config, and then compares it with what was
* expected. We simulate having a checkpoint manager that has 2 task
@@ -231,29 +229,6 @@ class TestJobCoordinator extends FlatSpec with
PrivateMethodTester {
assertEquals(jobModel, coordinator.jobModel)
}
- /**
- * Test with a JobFactory other than ProcessJobFactory or ThreadJobFactory
so that
- * JobConfing.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX does not match.
- */
- @Test
- def testWithPartitionAssignmentWithMockJobFactory {
- val config = getTestConfig(classOf[MockJobFactory])
- val systemStream = new SystemStream("test", "stream1")
- val streamMetadataCache = mock(classOf[StreamMetadataCache])
- when(streamMetadataCache.getStreamMetadata(Set(systemStream),
true)).thenReturn(
- Map(systemStream -> new SystemStreamMetadata(systemStream.getStream,
- Map(new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
- new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
- new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
- ).asJava)))
- val getInputStreamPartitions =
PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
- val getMatchedInputStreamPartitions =
PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
-
- val allSSP = JobModelManager invokePrivate
getInputStreamPartitions(config, streamMetadataCache)
- val matchedSSP = JobModelManager invokePrivate
getMatchedInputStreamPartitions(config, streamMetadataCache)
- assertEquals(matchedSSP, allSSP)
- }
-
def getTestConfig(clazz : Class[_]) = {
val config = new MapConfig(Map(
TaskConfig.CHECKPOINT_MANAGER_FACTORY ->
classOf[MockCheckpointManagerFactory].getCanonicalName,