Repository: samza Updated Branches: refs/heads/master a989c08b1 -> d7fc811d6
SAMZA-1175 - Removing CoordinationService from JobCoordinatorFactory interface Removing CoordinationService from JobCoordinatorFactory interface Author: navina <[email protected]> Reviewers: Xinyu Liu <[email protected]>,Boris Shkolnik <[email protected]> Closes #102 from navina/SAMZA-1175 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d7fc811d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d7fc811d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d7fc811d Branch: refs/heads/master Commit: d7fc811d64255d9a32ca3ecc3484afdef4e5caf5 Parents: a989c08 Author: Navina Ramesh <[email protected]> Authored: Wed Mar 29 16:25:16 2017 -0700 Committer: nramesh <[email protected]> Committed: Wed Mar 29 16:25:16 2017 -0700 ---------------------------------------------------------------------- .../coordinator/JobCoordinatorFactory.java | 3 +-- .../apache/samza/processor/StreamProcessor.java | 11 +-------- .../StandaloneJobCoordinatorFactory.java | 4 +--- .../org/apache/samza/zk/ZkJobCoordinator.java | 25 +++++++++++++------- .../samza/zk/ZkJobCoordinatorFactory.java | 6 ++--- 5 files changed, 22 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java index 056bdb1..d15bce1 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java @@ -32,6 +32,5 @@ public interface JobCoordinatorFactory { * pause the container and add/remove tasks * @return An instance of IJobCoordinator */ - JobCoordinator getJobCoordinator(int processorId, Config config, - SamzaContainerController containerController, CoordinationUtils coordinationUtils); + JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 3a62275..a39c3b9 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -23,8 +23,6 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.metrics.MetricsReporter; @@ -119,24 +117,17 @@ public class StreamProcessor { updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId)); Config updatedConfig = new MapConfig(updatedConfigMap); - SamzaContainerController containerController = new SamzaContainerController( taskFactory, new TaskConfigJava(updatedConfig).getShutdownMs(), String.valueOf(processorId), customMetricsReporters); - CoordinationUtils jobCooridanationService = Util. - <CoordinationServiceFactory>getObj( - new JobCoordinatorConfig(updatedConfig) - .getJobCoordinationServiceFactoryClassName()) - .getCoordinationService("groupId", String.valueOf(processorId), updatedConfig); - this.jobCoordinator = Util. <JobCoordinatorFactory>getObj( new JobCoordinatorConfig(updatedConfig) .getJobCoordinatorFactoryClassName()) - .getJobCoordinator(processorId, updatedConfig, containerController, jobCooridanationService); + .getJobCoordinator(processorId, updatedConfig, containerController); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java index 3588dce..7ca85c0 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java @@ -19,15 +19,13 @@ package org.apache.samza.standalone; import org.apache.samza.config.Config; -import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.processor.SamzaContainerController; public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory { @Override - public JobCoordinator getJobCoordinator(int processorId, Config config, - SamzaContainerController containerController, CoordinationUtils coordinationUtils) { + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { return new StandaloneJobCoordinator(processorId, config, containerController); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 87d6bac..b88753f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -18,15 +18,11 @@ */ package org.apache.samza.zk; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; @@ -40,6 +36,13 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * JobCoordinator for stand alone processor managed via Zookeeper. */ @@ -61,8 +64,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private String newJobModelVersion; // version published in ZK (by the leader) private JobModel jobModel; - public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, - SamzaContainerController containerController, CoordinationUtils coordinationUtils) { + public ZkJobCoordinator(int processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, + SamzaContainerController containerController) { this.zkUtils = zkUtils; this.keyBuilder = zkUtils.getKeyBuilder(); this.debounceTimer = debounceTimer; @@ -70,7 +73,11 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { this.containerController = containerController; this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this); this.config = config; - this.coordinationUtils = coordinationUtils; + this.coordinationUtils = Util. + <CoordinationServiceFactory>getObj( + new JobCoordinatorConfig(config) + .getJobCoordinationServiceFactoryClassName()) + .getCoordinationService(groupId, String.valueOf(processorId), config); streamMetadataCache = getStreamMetadataCache(); } http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 22ead65..915866d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -23,7 +23,6 @@ import org.I0Itec.zkclient.ZkClient; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.ZkConfig; -import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorFactory; import org.apache.samza.processor.SamzaContainerController; @@ -37,7 +36,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { * @return An instance of IJobCoordinator */ @Override - public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController, CoordinationUtils coordinationUtils) { + public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) { JobConfig jobConfig = new JobConfig(config); String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get()); ZkConfig zkConfig = new ZkConfig(config); @@ -47,6 +46,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { return new ZkJobCoordinator( processorId, + "groupId", // TODO: Usage of groupId to be resolved in SAMZA-1173 config, debounceTimer, new ZkUtils( @@ -55,6 +55,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { zkClient, zkConfig.getZkConnectionTimeoutMs() ), - containerController, coordinationUtils); + containerController); } }
