job model generation cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f15b9071 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f15b9071 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f15b9071 Branch: refs/heads/samza-standalone Commit: f15b907127774bfb38f753a906b563560c6f657a Parents: be6be8c Author: navina <[email protected]> Authored: Fri Dec 23 17:00:04 2016 -0800 Committer: navina <[email protected]> Committed: Fri Dec 23 17:00:04 2016 -0800 ---------------------------------------------------------------------- .../grouper/task/BalancingTaskNameGrouper.java | 9 ++++++++- .../main/java/org/apache/samza/zk/ZkJobCoordinator.java | 12 +++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java index f8295c8..1946271 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java @@ -18,6 +18,7 @@ */ package org.apache.samza.container.grouper.task; +import java.util.Collections; import java.util.Set; import org.apache.samza.container.LocalityManager; import org.apache.samza.job.model.ContainerModel; @@ -54,5 +55,11 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper { * @param localityManager provides a persisted task to container map to use as a baseline * @return the grouped tasks in the form of ContainerModels */ - Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager); + default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) { + return Collections.<ContainerModel>emptySet(); + } + + default Set<ContainerModel> balance(Set<Integer> containerIds, Set<TaskModel> tasks, LocalityManager localityManager) { + return Collections.<ContainerModel>emptySet(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 8c36ff2..3014c1b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -8,6 +8,8 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.JobModelManager$; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.processor.SamzaContainerController; @@ -41,7 +43,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { private Config config; private ZkKeyBuilder keyBuilder; private final ScheduleAfterDebounceTime debounceTimer; - //JobModelManager jobModelManager; + JobModelManager jobModelManager; public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) { this.zkUtils = zkUtils; @@ -75,7 +77,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock .instance()); - //jobModelManager = //JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); + jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null); //////////////////////////////////////////////////////////////////////////////////////////// } @@ -132,7 +134,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { Map<String, String> configMap = new HashMap<>(); Map<Integer, ContainerModel> containers = new HashMap<>(); MapConfig config = new MapConfig(configMap); - JobModel jobModel = new JobModel(config, containers); + //JobModel jobModel = new JobModel(config, containers); + JobModel jobModel = jobModelManager.jobModel(); log.info("pid=" + processorId + "Generated jobModel: " + jobModel); @@ -188,11 +191,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener { // ????? JobModel jobModel = getJobModel(); log.info("pid=" + processorId + "got the new job model =" + jobModel); - /* + containerController.startContainer( jobModel.getContainers().get(processorId), jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions); - */ } }
