Repository: samza
Updated Branches:
  refs/heads/samza-standalone a47e8819f -> 4918e3ad7


Pulling out from samza-li StandAloneApi


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7d6332b6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7d6332b6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7d6332b6

Branch: refs/heads/samza-standalone
Commit: 7d6332b69f02b9bdb5fda78983ef64eded79f66b
Parents: a47e881
Author: navina <[email protected]>
Authored: Fri Dec 23 16:58:14 2016 -0800
Committer: navina <[email protected]>
Committed: Fri Dec 23 16:58:14 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 gradle/dependency-versions.gradle               |   2 +-
 .../org/apache/samza/config/JavaJobConfig.java  |  27 +++
 .../samza/config/JobCoordinatorConfig.java      |  24 ++
 .../org/apache/samza/config/TaskConfigJava.java |  14 ++
 .../java/org/apache/samza/config/ZkConfig.java  |  30 +++
 .../samza/coordinator/JobCoordinator.java       |  58 +++++
 .../coordinator/JobCoordinatorFactory.java      |  30 +++
 .../leaderelection/LeaderElector.java           |   7 +
 .../processor/SamzaContainerController.java     | 127 ++++++++++
 .../apache/samza/processor/StreamProcessor.java | 148 ++++++++++++
 .../standalone/StandaloneJobCoordinator.java    | 121 ++++++++++
 .../StandaloneJobCoordinatorFactory.java        |  31 +++
 .../samza/zk/BarrierForVersionUpgrade.java      |   9 +
 .../samza/zk/ScheduleAfterDebounceTime.java     |  56 +++++
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 147 ++++++++++++
 .../java/org/apache/samza/zk/ZkController.java  |  11 +
 .../org/apache/samza/zk/ZkControllerImpl.java   | 136 +++++++++++
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 198 ++++++++++++++++
 .../samza/zk/ZkJobCoordinatorFactory.java       |  37 +++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |  45 ++++
 .../org/apache/samza/zk/ZkLeaderElector.java    | 110 +++++++++
 .../java/org/apache/samza/zk/ZkListener.java    |  11 +
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 237 +++++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  41 +++-
 .../samza/job/local/ThreadJobFactory.scala      |  11 +-
 26 files changed, 1659 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 5b41c52..0d60970 100644
--- a/build.gradle
+++ b/build.gradle
@@ -159,6 +159,7 @@ project(":samza-core_$scalaVersion") {
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
+    compile "com.101tec:zkclient:$zkClientVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 976a49c..872ae1b 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -26,7 +26,7 @@
   mockitoVersion = "1.8.4"
   scalaTestVersion = "2.2.4"
   zkClientVersion = "0.8"
-  zookeeperVersion = "3.3.4"
+  zookeeperVersion = "3.4.6"
   metricsVersion = "2.2.0"
   kafkaVersion = "0.10.0.1"
   commonsHttpClientVersion = "3.1"

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
new file mode 100644
index 0000000..c0747f0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaJobConfig.java
@@ -0,0 +1,27 @@
+package org.apache.samza.config;
+
+public class JavaJobConfig extends MapConfig {
+  private static final String JOB_NAME = "job.name"; // streaming.job_name
+  private static final String JOB_ID = "job.id"; // streaming.job_id
+  private static final String DEFAULT_JOB_ID = "1";
+
+  public JavaJobConfig (Config config) {
+    super(config);
+  }
+
+  public String getJobName() {
+    if (!containsKey(JOB_NAME)) {
+      throw new ConfigException("Missing " + JOB_NAME + " config!");
+    }
+    return get(JOB_NAME);
+  }
+
+  public String getJobName(String defaultValue) {
+    return get(JOB_NAME, defaultValue);
+  }
+
+  public String getJobId() {
+    return get(JOB_ID, DEFAULT_JOB_ID);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
new file mode 100644
index 0000000..c8e496e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -0,0 +1,24 @@
+package org.apache.samza.config;
+
+import com.google.common.base.Strings;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.util.Util;
+
+public class JobCoordinatorConfig extends MapConfig {
+  // TODO: Change this to job-coordinator.factory
+  private static final String JOB_COORDINATOR_FACTORY = 
"job.coordinator.factory";
+
+  public JobCoordinatorConfig (Config config) {
+    super(config);
+  }
+
+  public String getJobCoordinatorFactoryClassName() {
+    String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
+    if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
+      throw new ConfigException(
+          String.format("Missing config - %s. Cannot start StreamProcessor!", 
JOB_COORDINATOR_FACTORY));
+    }
+
+    return jobCoordinatorFactoryClassName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 648fe58..a88e1ec 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -35,6 +35,10 @@ import scala.collection.JavaConversions;
 
 
 public class TaskConfigJava extends MapConfig {
+  // Task Configs
+  private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
+  public static final long DEFAULT_TASK_SHUTDOWN_MS = 5000L;
+
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
   public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
   private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
@@ -117,4 +121,14 @@ public class TaskConfigJava extends MapConfig {
 
     return Collections.unmodifiableSet(allInputSS);
   }
+
+  /**
+   * Returns a value indicating how long to wait for the tasks to shutdown
+   *
+   * @return value indicating how long to wait for the tasks to shutdown
+   */
+  public long getShutdownMs() {
+    if (get(TASK_SHUTDOWN_MS) == null)  return DEFAULT_TASK_SHUTDOWN_MS;
+    return Long.valueOf(get(TASK_SHUTDOWN_MS));
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
new file mode 100644
index 0000000..973db42
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -0,0 +1,30 @@
+package org.apache.samza.config;
+
+public class ZkConfig extends MapConfig {
+  // Connection string for ZK, format: :<hostname>:<port>,..."
+  public static final String ZK_CONNECT = "coordinator.zk.connect";
+  public static final String ZK_SESSION_TIMEOUT_MS = 
"coordinator.zk.session-timeout-ms";
+  public static final String ZK_CONNECTION_TIMEOUT_MS = 
"coordinator.zk.session-timeout-ms";
+
+  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
+  public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
+
+  public ZkConfig (Config config) {
+    super(config);
+  }
+
+  public String getZkConnect() {
+    if (!containsKey(ZK_CONNECT)) {
+      throw new ConfigException("Missing " + ZK_CONNECT + " config!");
+    }
+    return get(ZK_CONNECT);
+  }
+
+  public int getZkSessionTimeoutMs() {
+    return getInt(ZK_SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS);
+  }
+
+  public int getZkConnectionTimeoutMs() {
+    return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
new file mode 100644
index 0000000..ce0de2b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.job.model.JobModel;
+
+/**
+ *  A JobCoordinator is a pluggable module in each process that provides the 
JobModel and the ID to the StreamProcessor.
+ *  In some cases, ID assignment is completely config driven, while in other 
cases, ID assignment may require
+ *  coordination with JobCoordinators of other StreamProcessors.
+ *  */
+public interface JobCoordinator {
+  /**
+   * Starts the JobCoordinator which involves one or more of the following:
+   * * LeaderElector Module initialization, if any
+   * * If leader, generate JobModel. Else, read JobModel
+   */
+  void start();
+
+  /**
+   * Cleanly shutting down the JobCoordinator involves:
+   * * Shutting down the LeaderElection module (TBD: details depending on 
leader or not)
+   * * TBD
+   */
+  void stop();
+
+  /**
+   * Returns the logical ID assigned to the processor
+   * This may be specified by the user when used as a Library and hence, it is 
upto the user to ensure that different
+   * instances of StreamProcessor have unique processor ID. In all other 
cases, this will be assigned by the leader?? (Need to think more)
+   * @return integer representing the logical processor ID
+   */
+  int getProcessorId();
+
+  /**
+   * Returns the current JobModel
+   * The implementation of the JobCoordinator in the leader needs to know how 
to read the config and generate JobModel
+   * In case of a non-leader, the JobCoordinator should simply fetch the 
jobmodel
+   * @return instance of JobModel that describes the partition distribution 
among the processors (and hence, tasks)
+   */
+  JobModel getJobModel();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
new file mode 100644
index 0000000..af2aaa7
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.processor.SamzaContainerController;
+
+public interface JobCoordinatorFactory {
+  /**
+   * @param config Configs relevant for the JobCoordinator TODO: Separate JC 
related configs into a "JobCoordinatorConfig"
+   * @return An instance of IJobCoordinator
+   */
+  JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
new file mode 100644
index 0000000..fc3cac9
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java
@@ -0,0 +1,7 @@
+package org.apache.samza.coordinator.leaderelection;
+
+public interface LeaderElector {
+  boolean tryBecomeLeader();
+  void resignLeadership();
+  boolean amILeader();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
 
b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
new file mode 100644
index 0000000..9352f27
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -0,0 +1,127 @@
+package org.apache.samza.processor;
+
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SamzaContainerController {
+  private static final Logger log = 
LoggerFactory.getLogger(SamzaContainerController.class);
+
+  private final ExecutorService executorService;
+  private volatile SamzaContainer container;
+  private final Map<String, MetricsReporter> metricsReporterMap;
+  private final Object taskFactory;
+  private final long containerShutdownMs;
+
+  // Internal Member Variables
+  private Future containerFuture;
+
+  /**
+   * Creates an instance of a controller for instantiating, starting and/or 
stopping {@link SamzaContainer}
+   * Requests to execute a container are submitted to the {@link 
ExecutorService}
+   *
+   * @param taskFactory Factory that be used create instances of {@link 
org.apache.samza.task.StreamTask} or
+   *                    {@link org.apache.samza.task.AsyncStreamTask}
+   * @param containerShutdownMs How long the Samza container should wait for 
an orderly shutdown of task instances
+   * @param metricsReporterMap Map of metric reporter name and {@link 
MetricsReporter} instance
+   */
+  public SamzaContainerController (
+      Object taskFactory,
+      long containerShutdownMs,
+      Map<String, MetricsReporter> metricsReporterMap) {
+    this.executorService = Executors.newSingleThreadExecutor();
+    this.taskFactory = taskFactory;
+    this.metricsReporterMap = metricsReporterMap;
+    if (containerShutdownMs == -1) {
+      this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS;
+    } else {
+      this.containerShutdownMs = containerShutdownMs;
+    }
+  }
+
+  /**
+   * Instantiates a container and submits to the executor. This method does 
not actually wait for the container to
+   * fully start-up. For such a behavior, see {@link #awaitStart(long)}
+   *
+   * <b>Note:</b> <i>This method does not stop a currently running container, 
if any. It is left up to the caller to
+   * ensure that the container has been stopped with stopContainer before 
invoking this method.</i>
+   *
+   * @param containerModel {@link ContainerModel} instance to use for the 
current run of the Container
+   * @param config Complete configuration map used by the Samza job
+   * @param maxChangelogStreamPartitions Max number of partitions expected in 
the changelog streams
+   * TODO: Try to get rid of maxChangelogStreamPartitions from method arguments
+   */
+  public void startContainer(ContainerModel containerModel, Config config, int 
maxChangelogStreamPartitions) {
+    LocalityManager localityManager = null;
+    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
+      localityManager = 
SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), 
config);
+    }
+    container = SamzaContainer$.MODULE$.apply(
+        containerModel.getContainerId(),
+        containerModel,
+        config,
+        maxChangelogStreamPartitions,
+        localityManager,
+        new JmxServer(),
+        Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
+        taskFactory);
+    containerFuture = executorService.submit(() -> container.run());
+  }
+
+  /**
+   * Method waits for a specified amount of time for the container to fully 
start-up, which consists of class-loading
+   * all the components and start message processing
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting
+   *          time elapsed
+   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
+   */
+  public boolean awaitStart(long timeoutMs) throws InterruptedException {
+    return container.awaitStart(timeoutMs);
+  }
+
+  /**
+   * Stops a running container, if any. Invoking this method multiple times 
does not have any side-effects.
+   */
+  public void stopContainer() {
+    if(container == null) {
+      log.warn("Shutdown before a container was created.");
+      return;
+    }
+
+    container.shutdown();
+    try {
+      containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException e) {
+      log.error("Ran into problems while trying to stop the container in the 
processor!", e);
+    } catch (TimeoutException e) {
+      log.warn("Got Timeout Exception while trying to stop the container in 
the processor! The processor may not shutdown properly", e);
+    }
+  }
+
+  /**
+   * Shutsdown the controller by first stop any running container and then, 
shutting down the {@link ExecutorService}
+   */
+  public void shutdown() {
+    stopContainer();
+    executorService.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
new file mode 100644
index 0000000..0f34400
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamProcessor can be embedded in any application or executed in a 
distributed environment (aka cluster) as
+ * independent processes <br />
+ * <p>
+ * <b>Usage Example:</b>
+ * <pre>
+ * StreamProcessor processor = new StreamProcessor(1, config); <br />
+ * processor.start();
+ * try {
+ *  boolean status = processor.awaitStart(TIMEOUT_MS);    // Optional - 
blocking call
+ *  if (!status) {
+ *    // Timed out
+ *  }
+ *  ...
+ * } catch (InterruptedException ie) {
+ *   ...
+ * } finally {
+ *   processor.stop();
+ * }
+ * </pre>
+ */
+public class StreamProcessor {
+  private static final Logger log = 
LoggerFactory.getLogger(StreamProcessor.class);
+  /**
+   * processor.id is equivalent to containerId in samza. It is a logical 
identifier used by Samza for a processor.
+   * In a distributed environment, this logical identifier is mapped to a 
physical identifier of the resource. For
+   * example, Yarn provides a "containerId" for every resource it allocates.
+   * In an embedded environment, this identifier is provided by the user by 
directly using the StreamProcessor API.
+   * <p>
+   * <b>Note:</b>This identifier has to be unique across the instances of 
StreamProcessors.
+   */
+  private static final String PROCESSOR_ID = "processor.id";
+  private final int processorId;
+  private final JobCoordinator jobCoordinator;
+  private final SamzaContainerController containerController;
+
+  /**
+   * Create an instance of StreamProcessor that encapsulates a JobCoordinator 
and Samza Container
+   * <p>
+   * JobCoordinator controls how the various StreamProcessor instances 
belonging to a job coordinate. It is also
+   * responsible generating and updating JobModel.
+   * When StreamProcessor starts, it starts the JobCoordinator and brings up a 
SamzaContainer based on the JobModel.
+   * SamzaContainer is executed using an ExecutorService. <br />
+   * <p>
+   * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the 
StreamProcessor, and NOT exposed to the user
+   *
+   * @param processorId            Unique identifier for a processor within 
the job. It has the same semantics as
+   *                               "containerId" in Samza
+   * @param config                 Instance of config object - contains all 
configuration required for processing
+   * @param customMetricsReporters Map of custom MetricReporter instances that 
are to be injected in the Samza job
+   */
+  public StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters) {
+    this(processorId, config, customMetricsReporters, (Object) null);
+  }
+
+  private StreamProcessor(int processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
+                          Object taskFactory) {
+    this.processorId = processorId;
+
+    Map<String, String> updatedConfigMap = new HashMap<>();
+    updatedConfigMap.putAll(config);
+    updatedConfigMap.put(PROCESSOR_ID, String.valueOf(processorId));
+    Config updatedConfig = new MapConfig(updatedConfigMap);
+
+
+    this.containerController = new SamzaContainerController(
+        taskFactory,
+        new TaskConfigJava(updatedConfig).getShutdownMs(),
+        customMetricsReporters);
+
+    this.jobCoordinator = Util.
+        <JobCoordinatorFactory>getObj(
+            new JobCoordinatorConfig(updatedConfig)
+                .getJobCoordinatorFactoryClassName())
+        .getJobCoordinator(processorId, updatedConfig, 
this.containerController);
+  }
+
+  /**
+   * StreamProcessor Lifecycle: start()
+   * <ul>
+   * <li>Starts the JobCoordinator and fetches the JobModel</li>
+   * <li>Starts the container using ContainerModel based on the processorId 
</li>
+   * </ul>
+   * When start() returns, it only guarantees that the container is 
initialized and submitted by the controller to
+   * execute
+   */
+  public void start() {
+    jobCoordinator.start();
+  }
+
+  /**
+   * Method that allows the user to wait for a specified amount of time for 
the container to initialize and start
+   * processing messages
+   *
+   * @param timeoutMs Maximum time to wait, in milliseconds
+   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting time
+   * elapsed
+   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
+   */
+  public boolean awaitStart(long timeoutMs) throws InterruptedException {
+    return containerController.awaitStart(timeoutMs); // TODO: Should 
awaitStart be part of the JC interface, instead of directly using container 
controller
+  }
+
+  /**
+   * StreamProcessor Lifecycle: stop()
+   * <ul>
+   * <li>Stops the SamzaContainer execution</li>
+   * <li>Stops the JobCoordinator</li>
+   * </ul>
+   */
+  public void stop() {
+    jobCoordinator.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
new file mode 100644
index 0000000..7fe1422
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standalone Job Coordinator does not implement any leader elector module or 
cluster manager
+ *
+ * It generates the JobModel using the Config passed into the constructor.
+ * Since the standalone JobCoordinator does not perform partition management, 
it allows two kinds of partition
+ * distribution mechanism - consumer-managed partition distribution and 
user-defined fixed partition distribution.
+ *
+ * */
+public class StandaloneJobCoordinator implements JobCoordinator {
+  private static final Logger log = 
LoggerFactory.getLogger(StandaloneJobCoordinator.class);
+  private final int processorId;
+  private final Config config;
+  private final JobModelManager jobModelManager;
+  private final SamzaContainerController containerController;
+
+  @VisibleForTesting
+  StandaloneJobCoordinator(
+      int processorId,
+      Config config,
+      SamzaContainerController containerController,
+      JobModelManager jobModelManager) {
+    this.processorId = processorId;
+    this.config = config;
+    this.containerController = containerController;
+    this.jobModelManager = jobModelManager;
+  }
+
+  public StandaloneJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
+    this.processorId = processorId;
+    this.config = config;
+    this.containerController = containerController;
+
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = 
systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        log.error(String.format("A stream uses system %s, which is missing 
from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which 
is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, 
this.config));
+    }
+
+    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 
5000, SystemClock.instance());
+
+    /** TODO:
+     * Locality Manager seems to be required in JC for reading locality info 
and grouping tasks intelligently and also,
+     * in SamzaContainer for writing locality info to the coordinator stream. 
This closely couples together
+     * TaskNameGrouper with the LocalityManager! Hence, groupers should be a 
property of the jobcoordinator
+     * (job.coordinator.task.grouper, instead of 
task.systemstreampartition.grouper)
+     */
+    this.jobModelManager = 
JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, 
streamMetadataCache, null);
+  }
+
+  @Override
+  public void start() {
+    // No-op
+    JobModel jobModel = getJobModel();
+    containerController.startContainer(
+        jobModel.getContainers().get(processorId),
+        jobModel.getConfig(),
+        jobModel.maxChangeLogStreamPartitions);
+  }
+
+  @Override
+  public void stop() {
+    // No-op
+    containerController.shutdown();
+  }
+
+  @Override
+  public int getProcessorId() {
+    return this.processorId;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return jobModelManager.jobModel();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
new file mode 100644
index 0000000..7ca85c0
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.processor.SamzaContainerController;
+
+public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory 
{
+  @Override
+  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
+    return new StandaloneJobCoordinator(processorId, config, 
containerController);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
new file mode 100644
index 0000000..691aced
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -0,0 +1,9 @@
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+public interface BarrierForVersionUpgrade {
+  void leaderStartBarrier(String version,  List<String> processorsNames);
+  void waitForBarrier(String version, String processorsName, Runnable 
callback);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
new file mode 100644
index 0000000..1854de6
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -0,0 +1,56 @@
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScheduleAfterDebounceTime {
+  public static final Logger LOG = 
LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+  public static final long timeoutMs = 1000*10;
+
+  public static final String JOB_MODEL_VERSION_CHANGE = 
"JobModelVersionChange";
+  public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+  public static final String ON_DATA_CHANGE_ON = "OnDataChanteOn";
+  public static final int DEBOUNCE_TIME_MS = 2000;
+
+
+  private final ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
+  private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+
+  public ScheduleAfterDebounceTime () {
+
+  }
+
+  synchronized public void scheduleAfterDebounceTime (String actionName, long 
debounceTimeMs, Runnable runnable) {//, final ReadyToCreateJobModelListener 
listener) {
+    // check if this action has been scheduled already
+    ScheduledFuture sf = futureHandles.get(actionName);
+    if(sf != null && !sf.isDone()) {
+      LOG.info(">>>>>>>>>>>DEBOUNCE: cancel future for " + actionName);
+      // attempt to cancel
+      if(! sf.cancel(false) ) {
+        try {
+          sf.get(timeoutMs, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          // we ignore the exception
+          LOG.warn("cancel for action " + actionName + " failed with ", e);
+        }
+      }
+      futureHandles.remove(actionName);
+    }
+    // schedule a new task
+    sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, 
TimeUnit.MILLISECONDS);
+    LOG.info(">>>>>>>>>>>DEBOUNCE: scheduled " + actionName + " in " + 
debounceTimeMs);
+    futureHandles.put(actionName, sf);
+  }
+
+  public void stopScheduler() {
+    // shutdown executor service
+    scheduledExecutorService.shutdown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
new file mode 100644
index 0000000..60a06da
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -0,0 +1,147 @@
+package org.apache.samza.zk;
+
+import java.util.List;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
+  private final ZkUtils zkUtils;
+  private final ZkKeyBuilder keyBuilder;
+  private final static String BARRIER_DONE = "done";
+  private final static Logger LOG = 
LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  final private String barrierPrefix;
+
+  public ZkBarrierForVersionUpgrade( ZkUtils zkUtils, 
ScheduleAfterDebounceTime debounceTimer) {
+    this.zkUtils = zkUtils;
+    keyBuilder = zkUtils.getKeyBuilder();
+
+    barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
+    this.debounceTimer = debounceTimer;
+  }
+
+  @Override
+  public void leaderStartBarrier(String version, List<String> processorsNames) 
{
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", 
barrierPath);
+    String barrier = String.format("%s/%s/barrier", barrierPrefix, version);
+
+    // TODO - do we need a check if it exists - it needs to be deleted?
+    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, 
barrierPath, barrierProcessors, barrierDonePath});
+
+    // callback for when the barrier is reached
+    Runnable callback = new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
+      }
+    };
+    // subscribe for processor's list changes
+    LOG.info("Subscribing for child changes at " + barrierProcessors);
+    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
+        new ZkBarrierChangeHandler(callback, processorsNames));
+  }
+
+  @Override
+  public void waitForBarrier(String version, String processorsName, Runnable 
callback) {
+    // if participant makes this call it means it has already stopped the old 
container and got the new job model.
+    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
+    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    String barrierProcessors = String.format("%s/barrier_processors", 
barrierPath);
+    String barrierProcessorThis = String.format("%s/%s", barrierProcessors, 
processorsName);
+
+
+    // update the barrier for this processor
+    LOG.info("Creating a child for barrier at " + barrierProcessorThis);
+    zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+
+    // now subscribe for the barrier
+    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new 
ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
+  }
+
+  /**
+   * listener for the subscription.
+   */
+  class ZkBarrierChangeHandler implements IZkChildListener {
+    Runnable callback;
+    List<String> names;
+
+    public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
+      this.callback = callback;
+      this.names = names;
+    }
+
+    @Override
+    public void handleChildChange(String parentPath, List<String> 
currentChildren) throws Exception {
+      // Find out the event & Log
+      boolean allIn = true;
+
+      if(currentChildren == null) {
+        LOG.info("Got handleChildChange with null currentChildren");
+        return;
+      }
+      // debug
+      StringBuilder sb = new StringBuilder();
+      for (String child : currentChildren) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children in the barrier = " + parentPath + ":" + 
sb.toString());
+      sb = new StringBuilder();
+      for (String child : names) {
+        sb.append(child).append(",");
+      }
+      LOG.info("list of children to compare against = " + parentPath + ":" + 
sb.toString());
+
+
+      // check if all the names are in
+      for(String n : names) {
+        if(!currentChildren.contains(n)) {
+          LOG.info("node " + n + " is still not in the list ");
+          allIn = false;
+          break;
+        }
+      }
+      if(allIn) {
+        LOG.info("ALl nodes reached the barrier");
+        callback.run(); // all the names have registered
+      }
+    }
+  }
+
+  class ZkBarrierReachedHandler implements  IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    private final String barrierPathDone;
+    private final Runnable callback;
+    public ZkBarrierReachedHandler(String barrierPathDone, 
ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
+      this.barrierPathDone = barrierPathDone;
+      this.callback =  callback;
+      this.debounceTimer = debounceTimer;
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data)
+    throws Exception {
+      String done = (String) data;
+      LOG.info("got notification about barrier path=" + barrierPathDone + "; 
done=" + done);
+      if (done.equals(BARRIER_DONE)) {
+        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+        
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE,
 0, callback);
+      } else {
+        // TODO do we need to resubscribe?
+      }
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath)
+    throws Exception {
+      LOG.warn("barrier done got deleted at " + dataPath);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
new file mode 100644
index 0000000..20e55ab
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -0,0 +1,11 @@
+package org.apache.samza.zk;
+
+
+public interface ZkController {
+  void register ();
+  boolean isLeader();
+  void notifyJobModelChange(String version);
+  void stop();
+  void listenToProcessorLiveness();
+  String currentJobModelVersion();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
new file mode 100644
index 0000000..fdd1f02
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -0,0 +1,136 @@
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ZkControllerImpl implements ZkController {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkControllerImpl.class);
+
+  private String processorIdStr;
+  private final ZkUtils zkUtils;
+  private final ZkListener zkListener;
+  private final ZkLeaderElector leaderElector;
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  public ZkControllerImpl (String processorIdStr, ZkUtils zkUtils, 
ScheduleAfterDebounceTime debounceTimer, ZkListener zkListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.zkListener = zkListener;
+    this.leaderElector = new ZkLeaderElector(this.processorIdStr, 
this.zkUtils, this.zkListener);
+    this.debounceTimer = debounceTimer;
+
+    init();
+  }
+
+  @Override
+  public void register() {
+
+    // TODO - make a loop here with some number of attempts.
+    // possibly split into two method - becomeLeader() and becomeParticipant()
+    boolean isLeader = leaderElector.tryBecomeLeader();
+    if(isLeader) {
+      listenToProcessorLiveness();
+
+      //      zkUtils.subscribeToProcessorChange(zkProcessorChangeListener);
+      
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> 
zkListener.onBecomeLeader());   // RECONSIDER MAKING THIS SYNC CALL
+
+    }
+
+    // subscribe to JobModel version updates
+    zkUtils.subscribeToJobModelVersionChange(new 
ZkJobModelVersionChangeHandler(debounceTimer));
+  }
+
+  private void init() {
+    ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
+    zkUtils.makeSurePersistentPathsExists(new String[] {
+        keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), 
keyBuilder.getJobModelPathPrefix()});
+  }
+
+  @Override
+  public boolean isLeader() {
+    return leaderElector.amILeader();
+  }
+
+  @Override
+  public void notifyJobModelChange(String version) {
+    zkListener.onNewJobModelAvailable(version);
+  }
+
+  @Override
+  public void stop() {
+    if (isLeader()) {
+      leaderElector.resignLeadership();
+    }
+    zkUtils.close();
+  }
+
+  @Override
+  public void listenToProcessorLiveness() {
+    zkUtils.subscribeToProcessorChange(new 
ZkProcessorChangeHandler(debounceTimer));
+  }
+
+  @Override
+  public String currentJobModelVersion() {
+    return zkUtils.getJobModelVersion();
+  }
+
+  // Only by Leader
+  class ZkProcessorChangeHandler  implements IZkChildListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * Called when the children of the given path changed.
+     *
+     * @param parentPath    The parent path
+     * @param currentChilds The children or null if the root node (parent 
path) was deleted.
+     * @throws Exception
+     */
+    @Override
+    public void handleChildChange(String parentPath, List<String> 
currentChilds) throws Exception {
+      LOG.info(
+          "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - 
Path: " + parentPath + "  Current Children: "
+              + currentChilds);
+      
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> 
zkListener.onProcessorChange(currentChilds));
+    }
+  }
+
+  class ZkJobModelVersionChangeHandler implements IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime 
debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * called when job model version gets updated
+     * @param dataPath
+     * @param data
+     * @throws Exception
+     */
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws 
Exception {
+      LOG.info("pid=" + processorIdStr + ". Got notification on version update 
change. path=" + dataPath + "; data="
+          + (String) data);
+
+      debounceTimer
+          
.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 
0, () -> notifyJobModelChange((String) data));
+    }
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      throw new SamzaException("version update path has been deleted!.");
+    }
+  }
+
+  public void shutdown() {
+    if(debounceTimer != null)
+      debounceTimer.stopScheduler();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
new file mode 100644
index 0000000..8c36ff2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -0,0 +1,198 @@
+package org.apache.samza.zk;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * JobCoordinator for stand alone processor managed via Zookeeper.
+ */
+public class ZkJobCoordinator implements JobCoordinator, ZkListener {
+  private static final Logger log = 
LoggerFactory.getLogger(ZkJobCoordinator.class);
+
+  private final ZkUtils zkUtils;
+  private final int processorId;
+  private final ZkController zkController;
+  private final SamzaContainerController containerController;
+
+  private final BarrierForVersionUpgrade barrier;
+
+
+  /////////////////////////////////////////
+  private JobModel newJobModel;
+  private String newJobModelVersion;  // version published in ZK (by the 
leader)
+  private Config config;
+  private ZkKeyBuilder keyBuilder;
+  private final ScheduleAfterDebounceTime debounceTimer;
+  //JobModelManager jobModelManager;
+
+  public ZkJobCoordinator(int processorId, Config config, 
ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, 
SamzaContainerController containerController) {
+    this.zkUtils = zkUtils;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    this.debounceTimer = debounceTimer;
+    this.processorId = processorId;
+    this.containerController = containerController;
+    this.zkController = new ZkControllerImpl(String.valueOf(processorId), 
zkUtils, debounceTimer, this);
+    this.config = config;
+
+
+    barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); //should 
not have any state in it
+
+
+
+    // TEMP for model generation
+    //////////////////////////////// NEEDS TO BE REPLACED 
//////////////////////////////////////
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = 
systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        log.error(String.format("A stream uses system %s, which is missing 
from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which 
is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, 
this.config));
+    }
+
+    StreamMetadataCache
+        streamMetadataCache = new StreamMetadataCache(Util.<String, 
SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
+        .instance());
+
+    //jobModelManager = 
//JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, 
streamMetadataCache, null);
+
+    
////////////////////////////////////////////////////////////////////////////////////////////
+  }
+
+  @Override
+  public void start() {
+    zkController.register();
+  }
+
+  public void cleanupZk() {
+    zkUtils.deleteRoot();
+  }
+
+  @Override
+  public void stop() {
+    zkController.stop();
+  }
+
+  @Override
+  public int getProcessorId()
+  {
+    return processorId;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return newJobModel;
+  }
+
+  //////////////////////////////////////////////// LEADER stuff 
///////////////////////////
+  @Override
+  public void onBecomeLeader() {
+    log.info("ZkJobCoordinator::onBecomeLeader - I become the leader!");
+    //zkController.listenToProcessorLiveness();
+    // Reset debounce Timer
+
+    // generate JobProcess
+    generateNewJobModel();
+  }
+
+  private void generateNewJobModel() {
+    // get the current list of processors
+    List<String> currentProcessors = zkUtils.getActiveProcessors();
+
+    // get the current version
+    String currentJMVersion  = zkUtils.getJobModelVersion();
+    String nextJMVersion;
+    if(currentJMVersion == null)
+      nextJMVersion = "1";
+    else
+      nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
+    log.info("pid=" + processorId + "generating new model. Version = " + 
nextJMVersion);
+
+    Map<String, String> configMap = new HashMap<>();
+    Map<Integer, ContainerModel> containers = new HashMap<>();
+    MapConfig config = new MapConfig(configMap);
+    JobModel jobModel = new JobModel(config, containers);
+
+    log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
+
+    // publish the new version
+    zkUtils.publishNewJobModel(nextJMVersion, jobModel);
+    log.info("pid=" + processorId + "published new JobModel ver=" + 
nextJMVersion + ";jm=" + jobModel);
+
+    // start the barrier for the job model update
+    barrier.leaderStartBarrier(nextJMVersion, currentProcessors);
+
+    // publish new JobModel version
+    zkUtils.publishNewJobModelVersion(currentJMVersion, nextJMVersion);
+    log.info("pid=" + processorId + "published new JobModel ver=" + 
nextJMVersion);
+  }
+
+   
//////////////////////////////////////////////////////////////////////////////////////////////
+  @Override
+  public void onProcessorChange(List<String> processorIds) {
+    // Reset debounce Timer
+    log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: 
" + Arrays.toString(processorIds.toArray()));
+    generateNewJobModel();
+  }
+
+  @Override
+  public void onNewJobModelAvailable(final String version) {
+    newJobModelVersion = version;
+    log.info("pid=" + processorId + "new JobModel available");
+    // stop current work
+    containerController.stopContainer();
+    log.info("pid=" + processorId + "new JobModel available.Container 
stopped.");
+    // get the new job model
+    newJobModel = zkUtils.getJobModel(version);
+    log.info("pid=" + processorId + "new JobModel available. ver=" + version + 
"; jm = " + newJobModel);
+
+
+    String currentPath = zkUtils.getEphemeralPath();
+
+    String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
+
+    // update ZK and wait for all the processors to get this new version
+    barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new 
Runnable() {
+      @Override
+      public void run() {
+        onNewJobModelConfirmed(version);
+      }
+    });
+  }
+
+  @Override
+  public void onNewJobModelConfirmed(String version) {
+    log.info("pid=" + processorId + "new version " + version + " of the job 
model got confirmed");
+    // get the new Model
+    // ?????
+    JobModel jobModel = getJobModel();
+    log.info("pid=" + processorId + "got the new job model =" + jobModel);
+    /*
+    containerController.startContainer(
+        jobModel.getContainers().get(processorId),
+        jobModel.getConfig(),
+        jobModel.maxChangeLogStreamPartitions);
+   */
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
new file mode 100644
index 0000000..90b0097
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -0,0 +1,37 @@
+package org.apache.samza.zk;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaJobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.processor.SamzaContainerController;
+
+public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
+  /**
+   * Method to instantiate an implementation of JobCoordinator
+   *
+   * @param processorId Indicates the StreamProcessor's id to which this Job 
Coordinator is associated with
+   * @param config      Configs relevant for the JobCoordinator TODO: Separate 
JC related configs into a "JobCoordinatorConfig"
+   * @return An instance of IJobCoordinator
+   */
+  @Override
+  public JobCoordinator getJobCoordinator(int processorId, Config config, 
SamzaContainerController containerController) {
+    JavaJobConfig jobConfig = new JavaJobConfig(config);
+    String groupName = String.format("%s-%s", jobConfig.getJobName(), 
jobConfig.getJobId());
+    ZkConfig zkConfig = new ZkConfig(config);
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    return new ZkJobCoordinator(
+        processorId,
+        config,
+        debounceTimer,
+        new ZkUtils(
+            new ZkKeyBuilder(groupName),
+            zkConfig.getZkConnect(),
+            debounceTimer,
+            String.valueOf(processorId),
+            zkConfig.getZkSessionTimeoutMs(),
+            zkConfig.getZkConnectionTimeoutMs()),
+        containerController);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
new file mode 100644
index 0000000..7ad62be
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -0,0 +1,45 @@
+package org.apache.samza.zk;
+
+public class ZkKeyBuilder {
+  private final String pathPrefix;
+  public static final String PROCESSORS_PATH = "processors";
+
+  public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
+
+  public ZkKeyBuilder () {
+    this("");
+  }
+  public ZkKeyBuilder (String pathPrefix) {
+    this.pathPrefix = pathPrefix;
+  }
+
+  public String getProcessorsPath() {
+    return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
+  }
+
+  public static String parseIdFromPath(String path) {
+    if (path != null)
+     return path.substring(path.indexOf("processor-"));
+    return null;
+  }
+
+  public String getJobModelVersionPath() {
+    return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+  }
+
+  public String getJobModelPathPrefix() {
+    return String.format("/%s/jobModels", pathPrefix);
+  }
+
+  public String getJobModelPath(String jobModelVersion) {
+    return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
+  }
+
+  public String getJobModelVersionBarrierPrefix() {
+    return String.format("/%s/versionBarriers", pathPrefix);
+  }
+
+  public String getRootPath() {
+    return "/" + pathPrefix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
new file mode 100644
index 0000000..8bffeb6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -0,0 +1,110 @@
+package org.apache.samza.zk;
+
+import java.util.Arrays;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.leaderelection.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Random;
+
+
+public class ZkLeaderElector implements LeaderElector {
+  public static final Logger log = 
LoggerFactory.getLogger(ZkLeaderElector.class);
+  private final ZkUtils zkUtils;
+  private final ZkListener zkListener;
+  private final String processorIdStr;
+  private final ZkKeyBuilder keyBuilder;
+
+  private String leaderId = null;
+  private final ZkLeaderListener zkLeaderListener = new ZkLeaderListener();
+  private String currentSubscription = null;
+  private final Random random = new Random();
+
+  public ZkLeaderElector (String processorIdStr, ZkUtils zkUtils, ZkListener 
zkListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.keyBuilder = this.zkUtils.getKeyBuilder();
+    this.zkListener = zkListener;
+  }
+
+  @Override
+  public boolean tryBecomeLeader() {
+    String currentPath = zkUtils.getEphemeralPath();
+
+    if (currentPath == null || currentPath.isEmpty()) {
+      zkUtils.registerProcessorAndGetId();
+      currentPath = zkUtils.getEphemeralPath();
+    }
+
+    List<String> children = zkUtils.getActiveProcessors();
+    int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
+
+    if (index == -1) {
+      // Retry register here??
+      throw new SamzaException("Looks like we are no longer connected to Zk. 
Need to reconnect??");
+    }
+
+    if (index == 0) {
+      log.info("pid=" + processorIdStr + " Eligible to be the leader!");
+      leaderId = ZkKeyBuilder.parseIdFromPath(currentPath);
+      return true;
+    }
+
+    log.info("pid=" + processorIdStr + ";index=" + index + ";children=" + 
Arrays.toString(children.toArray()) + " Not eligible to be a leader yet!");
+    leaderId = ZkKeyBuilder.parseIdFromPath(children.get(0));
+    String prevCandidate = children.get(index - 1);
+    if (!prevCandidate.equals(currentSubscription)) {
+      if (currentSubscription != null) {
+        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription, zkLeaderListener);
+      }
+      currentSubscription = prevCandidate;
+      log.info("pid=" + processorIdStr + "Subscribing to " + prevCandidate);
+      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + 
currentSubscription, zkLeaderListener);
+    }
+
+    // Double check that the previous candidate still exists
+    boolean prevCandidateExists = 
zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
+    if (prevCandidateExists) {
+      log.info("pid=" + processorIdStr + "Previous candidate still exists. 
Continuing as non-leader");
+    } else {
+      // TODO - what actually happens here..
+      try {
+        Thread.sleep(random.nextInt(1000));
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+      }
+      log.info("pid=" + processorIdStr + "Previous candidate doesn't exist 
anymore. Trying to become leader again...");
+      return tryBecomeLeader();
+    }
+    return false;
+  }
+
+  @Override
+  public void resignLeadership() {
+  }
+
+  @Override
+  public boolean amILeader() {
+    return zkUtils.getEphemeralPath() != null
+        && leaderId != null
+        && 
leaderId.equals(ZkKeyBuilder.parseIdFromPath(zkUtils.getEphemeralPath()));
+  }
+
+  // Only by non-leaders
+  class ZkLeaderListener implements IZkDataListener {
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws 
Exception {
+      log.info("ZkLeaderListener::handleDataChange on path " + dataPath + " 
Data: " + data);
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      log.info("ZkLeaderListener::handleDataDeleted on path " + dataPath);
+      tryBecomeLeader();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
new file mode 100644
index 0000000..4a1c491
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkListener.java
@@ -0,0 +1,11 @@
+package org.apache.samza.zk;
+
+import java.util.List;
+
+public interface ZkListener {
+  void onBecomeLeader();
+  void onProcessorChange(List<String> processorIds);
+
+  void onNewJobModelAvailable(String version); // start job model update (stop 
current work)
+  void onNewJobModelConfirmed(String version); // start new work according to 
the new model
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
new file mode 100644
index 0000000..6655468
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -0,0 +1,237 @@
+package org.apache.samza.zk;
+
+import java.io.IOException;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ZkUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
+
+  public final ReentrantLock lock = new ReentrantLock();
+
+  private final ZkStateChangeHandler zkStateChangeHandler;
+  private final ZkClient zkClient;
+  private final ZkConnection zkConnnection;
+  private volatile String ephemeralPath = null;
+  private final ZkKeyBuilder keyBuilder;
+  private final int sessionTimeoutMs;
+  private final int connectionTimeoutMs;
+  private final ScheduleAfterDebounceTime debounceTimer;
+  private final String processorId;
+
+  public ZkUtils(String zkConnectString, ScheduleAfterDebounceTime 
debounceTimer, String processorId) {
+    this(new ZkKeyBuilder(), zkConnectString, debounceTimer, processorId, 
ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, String zkConnectString, 
ScheduleAfterDebounceTime debounceTimer, String processorId, int 
sessionTimeoutMs, int connectionTimeoutMs) {
+    this.keyBuilder = zkKeyBuilder;
+    this.sessionTimeoutMs = sessionTimeoutMs;
+    this.connectionTimeoutMs = connectionTimeoutMs;
+    this.zkConnnection = new ZkConnection(zkConnectString, 
this.sessionTimeoutMs);
+    this.zkClient = new ZkClient(zkConnnection, this.connectionTimeoutMs);
+    this.zkClient.waitForKeeperState(Watcher.Event.KeeperState.SyncConnected, 
10000, TimeUnit.MILLISECONDS);
+    this.debounceTimer = debounceTimer;
+    this.zkStateChangeHandler = new ZkStateChangeHandler(debounceTimer);
+    this.processorId = processorId;
+  }
+
+  public void connect() throws ZkInterruptedException {
+    boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, 
TimeUnit.MILLISECONDS);
+    if (!isConnected) {
+      throw new RuntimeException("Unable to connect to Zookeeper within 
connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
+    } else {
+      zkClient.subscribeStateChanges(zkStateChangeHandler);
+    }
+  }
+
+  public ZkClient getZkClient() {
+    return zkClient;
+  }
+
+  public ZkConnection getZkConnnection() {
+    return zkConnnection;
+  }
+
+  public ZkKeyBuilder getKeyBuilder() {
+    return keyBuilder;
+  }
+  public void makeSurePersistentPathsExists(String[] paths) {
+    for(String path: paths) {
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true);
+      }
+    }
+  }
+
+  public synchronized String registerProcessorAndGetId() {
+    try {
+      // TODO: Data should be more than just the hostname. Use Json serialized 
data
+      ephemeralPath =
+          zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + 
"/processor-", InetAddress.getLocalHost().getHostName());
+      return ephemeralPath;
+    } catch (UnknownHostException e) {
+      throw new RuntimeException("Failed to register as worker. Aborting...");
+    }
+  }
+
+  public synchronized String getEphemeralPath() {
+    return ephemeralPath;
+  }
+
+  public List<String> getActiveProcessors() {
+    List<String> children = 
zkClient.getChildren(keyBuilder.getProcessorsPath());
+    assert children.size() > 0;
+    Collections.sort(children);
+    LOG.info("Found these children - " + children);
+    return children;
+  }
+
+  ////////////////////////// TEMP ///////////////// NEEDS to be discussed 
////////////////
+  public void publishNewJobModel(String jobModelVersion, JobModel jobModel) {
+    try {
+      // We assume (needs to be verified) that this call will FAIL if the node 
already exists!!!!!!!!
+      ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+      String jobModelStr = mmapper.writeValueAsString(jobModel);
+      LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+      zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), 
jobModelStr);
+      LOG.info("wrote jobModel path =" + 
keyBuilder.getJobModelPath(jobModelVersion));
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+  public JobModel getJobModel(String jobModelVersion) {
+    LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + 
" from " + keyBuilder.getJobModelPath(jobModelVersion));
+    Object data = 
zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+    ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
+    JobModel jm;
+    try {
+      jm = mmapper.readValue((String)data, JobModel.class);
+    } catch (IOException e) {
+      throw new SamzaException("failed to read JobModel from ZK", e);
+    }
+    return jm;
+  }
+  ///////////////////////////////////////////////////////////////////////////
+
+
+  public String getJobModelVersion() {
+    return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+  }
+
+  public void publishNewJobModelVersion(String oldVersion, String newVersion) {
+    Stat stat = new Stat();
+    String currentVersion =  
zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    LOG.info("pid=" + processorId + " publishing new version: " + newVersion + 
"; oldVersion = " + oldVersion + "(" + stat.getVersion() + ")");
+    if(currentVersion != null && !currentVersion.equals(oldVersion)) {
+      throw new SamzaException("Someone change JMVersion while Leader was 
generating: expected" + oldVersion  + ", got " + currentVersion);
+    }
+    int dataVersion = stat.getVersion();
+    stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), 
newVersion, dataVersion);
+    if(stat.getVersion() != dataVersion + 1)
+      throw new SamzaException("Someone changed data version of the JMVersion 
while Leader was generating a new one. current= " + dataVersion + ", old 
version = " + stat.getVersion());
+
+    LOG.info("pid=" + processorId +
+        " published new version: " + newVersion + "; expected dataVersion = " 
+ dataVersion + "(" + stat.getVersion()
+            + ")");
+  }
+
+  /**
+   * subscribe for changes of JobModel version
+   * @param dataListener
+   */
+  public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
+    LOG.info("pid=" + processorId + " subscribing for jm version change at:" + 
keyBuilder.getJobModelVersionPath());
+    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), 
dataListener);
+  }
+
+  public void subscribeToProcessorChange(IZkChildListener listener) {
+    LOG.info("pid=" + processorId + " subscribing for child change at:" + 
keyBuilder.getProcessorsPath());
+    zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+  }
+
+  /* Wrapper for standard I0Itec methods */
+  public void unsubscribeDataChanges(String path, IZkDataListener 
dataListener) {
+    LOG.info("pid=" + processorId + " unsubscribing for data change at:" + 
path);
+    zkClient.unsubscribeDataChanges(path, dataListener);
+  }
+
+  public void subscribeDataChanges(String path, IZkDataListener dataListener) {
+    LOG.info("pid=" + processorId + " subscribing for data change at:" + path);
+    zkClient.subscribeDataChanges(path, dataListener);
+  }
+
+  public boolean exists(String path) {
+    return zkClient.exists(path);
+  }
+
+  public void close() {
+    zkClient.close();
+  }
+
+  public void deleteRoot() {
+    String rootPath = keyBuilder.getRootPath();
+    if(rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+      LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+      zkClient.deleteRecursive(rootPath);
+    }
+  }
+
+  class ZkStateChangeHandler implements IZkStateListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkStateChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+
+    /**
+     * Called when the zookeeper connection state has changed.
+     *
+     * @param state The new state.
+     * @throws Exception On any error.
+     */
+    @Override
+    public void handleStateChanged(Watcher.Event.KeeperState state) throws 
Exception {    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has 
been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception On any error.
+     */
+    @Override
+    public void handleNewSession() throws Exception {
+
+    }
+
+    /**
+     * Called when a session cannot be re-established. This should be used to 
implement connection
+     * failure handling e.g. retry to connect or pass the error up
+     *
+     * @param error The error that prevents a session from being established
+     * @throws Exception On any error.
+     */
+    @Override
+    public void handleSessionEstablishmentError(Throwable error) throws 
Exception {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f1d62c5..f4d605f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -32,7 +32,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.{Config, ShellCommandConfig}
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -112,7 +112,14 @@ object SamzaContainer extends Logging {
 
     try {
       jmxServer = newJmxServer()
-      SamzaContainer(containerModel, jobModel, jmxServer).run
+      val containerModel = jobModel.getContainers.get(containerId.toInt)
+      SamzaContainer(
+        containerId.toInt,
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        getLocalityManager(containerId, config),
+        jmxServer).run
     } finally {
       if (jmxServer != null) {
         jmxServer.stop
@@ -120,6 +127,17 @@ object SamzaContainer extends Logging {
     }
   }
 
+  def getLocalityManager(containerId: Int, config: Config): LocalityManager = {
+    val containerName = getSamzaContainerName(containerId)
+    val registryMap = new MetricsRegistryMap(containerName)
+    val coordinatorSystemProducer =
+      new CoordinatorStreamSystemFactory()
+        .getCoordinatorStreamSystemProducer(
+          config,
+          new SamzaContainerMetrics(containerName, registryMap).registry)
+    new LocalityManager(coordinatorSystemProducer)
+  }
+
   /**
    * Fetches config, task:SSP assignments, and task:changelog partition
    * assignments, and returns objects to be used for SamzaContainer's
@@ -136,10 +154,19 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
-  def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: 
JmxServer) = {
-    val config = jobModel.getConfig
-    val containerId = containerModel.getContainerId
-    val containerName = "samza-container-%s" format containerId
+  def getSamzaContainerName(containerId: Int): String = {
+    "samza-container-%d" format containerId
+  }
+
+  def apply(
+    containerId: Int,
+    containerModel: ContainerModel,
+    config: Config,
+    maxChangeLogStreamPartitions: Int,
+    localityManager: LocalityManager,
+    jmxServer: JmxServer,
+    customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter]()) = {
+    val containerName = getSamzaContainerName(containerId)
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
@@ -528,7 +555,7 @@ object SamzaContainer extends Logging {
         taskStores = taskStores,
         storeConsumers = storeConsumers,
         changeLogSystemStreams = changeLogSystemStreams,
-        jobModel.maxChangeLogStreamPartitions,
+        maxChangeLogStreamPartitions,
         streamMetadataCache = streamMetadataCache,
         storeBaseDir = defaultStoreBaseDir,
         loggedStoreBaseDir = loggedStorageBaseDir,

http://git-wip-us.apache.org/repos/asf/samza/blob/7d6332b6/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index eaab3a6..9ccf6fc 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -38,7 +38,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
     val coordinator = JobModelManager(config)
-    val containerModel = coordinator.jobModel.getContainers.get(0)
+    val jobModel = coordinator.jobModel
+    val containerModel = jobModel.getContainers.get(0)
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
     config.getTaskOpts match {
@@ -52,7 +53,13 @@ class ThreadJobFactory extends StreamJobFactory with Logging 
{
         override def run(): Unit = {
           val jmxServer = new JmxServer
           try {
-            SamzaContainer(containerModel, coordinator.jobModel, 
jmxServer).run()
+            SamzaContainer(
+              containerModel.getContainerId,
+              containerModel,
+              config,
+              jobModel.maxChangeLogStreamPartitions,
+              null,
+              new JmxServer)
           } finally {
             jmxServer.stop
           }

Reply via email to