added ContainerIds list to the group() method of a groupper
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e37f9105 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e37f9105 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e37f9105 Branch: refs/heads/samza-standalone Commit: e37f91053e7b84a08e250b6ee3b4fd9348a2c624 Parents: 8cbf014 Author: navina <[email protected]> Authored: Fri Dec 23 17:01:52 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 17:01:52 2016 -0800 ---------------------------------------------------------------------- .../task/SimpleGroupByContainerCount.java | 27 ++++++++++++-------- .../container/grouper/task/TaskNameGrouper.java | 5 ++++ .../standalone/StandaloneJobCoordinator.java | 2 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 23 ++++++++++++----- .../java/org/apache/samza/zk/ZkKeyBuilder.java | 13 +++++++++- .../samza/coordinator/JobCoordinator.scala | 18 ++++++++----- 6 files changed, 63 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java index c9489f7..359c4ed 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SimpleGroupByContainerCount.java @@ -13,18 +13,30 @@ import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; -public class SimpleGroupByContainerCount implements BalancingTaskNameGrouper { - private final int containerCount; +public class SimpleGroupByContainerCount implements TaskNameGrouper { + private final int startContainerCount; public SimpleGroupByContainerCount() { - this.containerCount = 1; + this.startContainerCount = 1; } public SimpleGroupByContainerCount(int containerCount) { if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container"); - this.containerCount = containerCount; + this.startContainerCount = containerCount; } @Override public Set<ContainerModel> group(Set<TaskModel> tasks) { + List<Integer> containerIds = new ArrayList<>(startContainerCount); + for (int i = 0; i < startContainerCount; i++) { + containerIds.add(i); + } + return group(tasks, containerIds); + } + + public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) { + if(containersIds == null) + return this.group(tasks); + + int containerCount = containersIds.size(); // Sort tasks by taskName. List<TaskModel> sortedTasks = new ArrayList<>(tasks); @@ -43,14 +55,9 @@ public class SimpleGroupByContainerCount implements BalancingTaskNameGrouper { // Convert to a Set of ContainerModel Set<ContainerModel> containerModels = new HashSet<>(); for (int i = 0; i < containerCount; i++) { - containerModels.add(new ContainerModel(i, taskGroups[i])); + containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i])); } return Collections.unmodifiableSet(containerModels); } - - @Override - public Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) { - return null; - } } http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java index 59a3237..848beea 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java @@ -18,6 +18,7 @@ */ package org.apache.samza.container.grouper.task; +import java.util.List; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; @@ -50,4 +51,8 @@ public interface TaskNameGrouper { * @return Set of containers, which contain the tasks that were passed in. */ Set<ContainerModel> group(Set<TaskModel> tasks); + + default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java index 7fe1422..5c525d1 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java @@ -90,7 +90,7 @@ public class StandaloneJobCoordinator implements JobCoordinator { * TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator * (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) */ - this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + this.jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, null); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 3014c1b..560e19b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -1,5 +1,6 @@ package org.apache.samza.zk; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -43,6 +44,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { private Config config; private ZkKeyBuilder keyBuilder; private final ScheduleAfterDebounceTime debounceTimer; + private final StreamMetadataCache streamMetadataCache; JobModelManager jobModelManager; public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) { @@ -73,11 +75,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); } - StreamMetadataCache - streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock + streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock .instance()); - jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + //////////////////////////////////////////////////////////////////////////////////////////// } @@ -131,10 +132,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1); log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion); - Map<String, String> configMap = new HashMap<>(); - Map<Integer, ContainerModel> containers = new HashMap<>(); - MapConfig config = new MapConfig(configMap); + //Map<String, String> configMap = new HashMap<>(); + //Map<Integer, ContainerModel> containers = new HashMap<>(); + //MapConfig config = new MapConfig(configMap); //JobModel jobModel = new JobModel(config, containers); + StringBuilder sb = new StringBuilder(); + List<Integer> containerIds = new ArrayList<>(); + for(String processor: currentProcessors){ + String zkProcessorId = keyBuilder.parseContainerIdFromProcessorId(processor); + sb.append(zkProcessorId).append(","); + containerIds.add(Integer.valueOf(zkProcessorId)); + } + log.info("processorsIds: " + sb.toString()); + + jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null, containerIds); JobModel jobModel = jobModelManager.jobModel(); log.info("pid=" + processorId + "Generated jobModel: " + jobModel); http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java index 7ad62be..42d0c86 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -1,8 +1,12 @@ package org.apache.samza.zk; +import org.apache.samza.SamzaException; + + public class ZkKeyBuilder { private final String pathPrefix; public static final String PROCESSORS_PATH = "processors"; + public static final String PROCESSOR_ID_PREFIX = "processor-"; public static final String JOBMODEL_VERSION_PATH = "jobModelVersion"; @@ -19,10 +23,17 @@ public class ZkKeyBuilder { public static String parseIdFromPath(String path) { if (path != null) - return path.substring(path.indexOf("processor-")); + return path.substring(path.indexOf(PROCESSOR_ID_PREFIX)); return null; } + public static String parseContainerIdFromProcessorId(String prId) { + if(prId == null) + throw new SamzaException("processor id is null"); + + return prId.substring(prId.indexOf(PROCESSOR_ID_PREFIX) + PROCESSOR_ID_PREFIX.length()); + } + public String getJobModelVersionPath() { return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH); } http://git-wip-us.apache.org/repos/asf/samza/blob/e37f9105/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index df63b97..7bf4921 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -106,7 +106,7 @@ object JobModelManager extends Logging { } } - val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor) + val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor, null) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions) jobCoordinator @@ -121,8 +121,9 @@ object JobModelManager extends Logging { changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache, - streamPartitionCountMonitor: StreamPartitionCountMonitor) = { - val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) + streamPartitionCountMonitor: StreamPartitionCountMonitor, + containerIds: java.util.List[Integer]) = { + val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache, containerIds) jobModelRef.set(jobModel) val server = new HttpServer @@ -188,7 +189,8 @@ object JobModelManager extends Logging { private def initializeJobModel(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, - streamMetadataCache: StreamMetadataCache): JobModel = { + streamMetadataCache: StreamMetadataCache, + containerIds: java.util.List[Integer]): JobModel = { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) @@ -215,7 +217,8 @@ object JobModelManager extends Logging { allSystemStreamPartitions, groups, previousChangelogMapping, - localityManager) + localityManager, + containerIds) val jobModel = jobModelGenerator() @@ -248,7 +251,8 @@ object JobModelManager extends Logging { allSystemStreamPartitions: util.Set[SystemStreamPartition], groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], - localityManager: LocalityManager): JobModel = { + localityManager: LocalityManager, + containerIds: java.util.List[Integer]): JobModel = { // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change // mapping. @@ -280,7 +284,7 @@ object JobModelManager extends Logging { if (containerGrouper.isInstanceOf[BalancingTaskNameGrouper]) containerGrouper.asInstanceOf[BalancingTaskNameGrouper].balance(taskModels, localityManager) else - containerGrouper.group(taskModels) + containerGrouper.group(taskModels, containerIds) } val containerMap = asScalaSet(containerModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
