Repository: samza
Updated Branches:
  refs/heads/master a989c08b1 -> d7fc811d6


SAMZA-1175 - Removing CoordinationService from JobCoordinatorFactory interface

Removing CoordinationService from JobCoordinatorFactory interface

Author: navina <[email protected]>

Reviewers: Xinyu Liu <[email protected]>,Boris Shkolnik <[email protected]>

Closes #102 from navina/SAMZA-1175


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d7fc811d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d7fc811d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d7fc811d

Branch: refs/heads/master
Commit: d7fc811d64255d9a32ca3ecc3484afdef4e5caf5
Parents: a989c08
Author: Navina Ramesh <[email protected]>
Authored: Wed Mar 29 16:25:16 2017 -0700
Committer: nramesh <[email protected]>
Committed: Wed Mar 29 16:25:16 2017 -0700

----------------------------------------------------------------------
 .../coordinator/JobCoordinatorFactory.java      |  3 +--
 .../apache/samza/processor/StreamProcessor.java | 11 +--------
 .../StandaloneJobCoordinatorFactory.java        |  4 +---
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 25 +++++++++++++-------
 .../samza/zk/ZkJobCoordinatorFactory.java       |  6 ++---
 5 files changed, 22 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 056bdb1..d15bce1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -32,6 +32,5 @@ public interface JobCoordinatorFactory {
    *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(int processorId, Config config,
-      SamzaContainerController containerController, CoordinationUtils 
coordinationUtils);
+  JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 3a62275..a39c3b9 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -23,8 +23,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.metrics.MetricsReporter;
@@ -119,24 +117,17 @@ public class StreamProcessor {
     updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
     Config updatedConfig = new MapConfig(updatedConfigMap);
 
-
     SamzaContainerController containerController = new 
SamzaContainerController(
         taskFactory,
         new TaskConfigJava(updatedConfig).getShutdownMs(),
         String.valueOf(processorId),
         customMetricsReporters);
 
-    CoordinationUtils jobCooridanationService = Util.
-        <CoordinationServiceFactory>getObj(
-            new JobCoordinatorConfig(updatedConfig)
-                .getJobCoordinationServiceFactoryClassName())
-        .getCoordinationService("groupId", String.valueOf(processorId), 
updatedConfig);
-
     this.jobCoordinator = Util.
         <JobCoordinatorFactory>getObj(
             new JobCoordinatorConfig(updatedConfig)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, updatedConfig, containerController, 
jobCooridanationService);
+        .getJobCoordinator(processorId, updatedConfig, containerController);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 3588dce..7ca85c0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -19,15 +19,13 @@
 package org.apache.samza.standalone;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory 
{
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config,
-      SamzaContainerController containerController, CoordinationUtils 
coordinationUtils) {
+  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
     return new StandaloneJobCoordinator(processorId, config, 
containerController);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 87d6bac..b88753f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,15 +18,11 @@
  */
 package org.apache.samza.zk;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
@@ -40,6 +36,13 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -61,8 +64,8 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   private String newJobModelVersion;  // version published in ZK (by the 
leader)
   private JobModel jobModel;
 
-  public ZkJobCoordinator(int processorId, Config config, 
ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
-      SamzaContainerController containerController, CoordinationUtils 
coordinationUtils) {
+  public ZkJobCoordinator(int processorId, String groupId, Config config, 
ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+      SamzaContainerController containerController) {
     this.zkUtils = zkUtils;
     this.keyBuilder = zkUtils.getKeyBuilder();
     this.debounceTimer = debounceTimer;
@@ -70,7 +73,11 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     this.containerController = containerController;
     this.zkController = new ZkControllerImpl(String.valueOf(processorId), 
zkUtils, debounceTimer, this);
     this.config = config;
-    this.coordinationUtils = coordinationUtils;
+    this.coordinationUtils = Util.
+        <CoordinationServiceFactory>getObj(
+            new JobCoordinatorConfig(config)
+                .getJobCoordinationServiceFactoryClassName())
+        .getCoordinationService(groupId, String.valueOf(processorId), config);
 
     streamMetadataCache = getStreamMetadataCache();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/d7fc811d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 22ead65..915866d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -23,7 +23,6 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.processor.SamzaContainerController;
@@ -37,7 +36,7 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController, CoordinationUtils 
coordinationUtils) {
+  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
     JobConfig jobConfig = new JobConfig(config);
     String groupName = String.format("%s-%s", jobConfig.getName().get(), 
jobConfig.getJobId().get());
     ZkConfig zkConfig = new ZkConfig(config);
@@ -47,6 +46,7 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
 
     return new ZkJobCoordinator(
         processorId,
+        "groupId",  // TODO: Usage of groupId to be resolved in SAMZA-1173
         config,
         debounceTimer,
         new ZkUtils(
@@ -55,6 +55,6 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
             zkClient,
             zkConfig.getZkConnectionTimeoutMs()
             ),
-        containerController, coordinationUtils);
+        containerController);
   }
 }

Reply via email to