SAMZA-1212 - Refactor interaction between StreamProcessor, JobCoordinator and 
SamzaContainer

See SAMZA-1212 for motivation toward this refactoring.
Changes here are:
* Removed awaitStart (blocking) method in StreamProcessor, JobCoordinator and 
SamzaContainer
* Introduced SamzaContainerListener and JobCoordinatorListener interface 
implemented by StreamProcessor
* Introduced SamzaContainerStatus to handler failures and lifecycle using 
Listener interfaces

Author: Navina Ramesh <[email protected]>

Reviewers: Xinyu Liu <[email protected]>, Prateek Maheshwari 
<[email protected]>

Closes #148 from navina/SAMZA-1212


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/475b4654
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/475b4654
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/475b4654

Branch: refs/heads/master
Commit: 475b4654cae52a4f92fe39c931a0e673048dd027
Parents: 0fb025b
Author: Navina Ramesh <[email protected]>
Authored: Wed May 3 15:10:13 2017 -0700
Committer: nramesh <[email protected]>
Committed: Wed May 3 15:10:13 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/SamzaContainerStatus.java  |  68 ++++
 .../samza/container/SamzaContainerListener.java |  55 ++++
 .../samza/coordinator/JobCoordinator.java       |  51 +--
 .../coordinator/JobCoordinatorFactory.java      |   8 +-
 .../coordinator/JobCoordinatorListener.java     |  61 ++++
 .../processor/SamzaContainerController.java     | 164 ----------
 .../apache/samza/processor/StreamProcessor.java | 307 ++++++++++++++----
 .../StreamProcessorLifecycleListener.java       |   3 +
 .../samza/runtime/LocalContainerRunner.java     |  26 +-
 .../standalone/StandaloneJobCoordinator.java    | 119 ++++---
 .../StandaloneJobCoordinatorFactory.java        |   5 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    |   1 -
 .../org/apache/samza/zk/ZkControllerImpl.java   |  15 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  44 +--
 .../samza/zk/ZkJobCoordinatorFactory.java       |   7 +-
 .../apache/samza/container/SamzaContainer.scala | 134 ++++++--
 .../org/apache/samza/job/local/ThreadJob.scala  |  10 +-
 .../samza/job/local/ThreadJobFactory.scala      |  51 +--
 .../samza/processor/TestStreamProcessor.java    | 176 +++++++++++
 .../samza/container/TestSamzaContainer.scala    | 316 +++++++++++++++++--
 .../processor/StreamProcessorTestUtils.scala    |  67 ++++
 .../system/kafka/TestKafkaSystemAdminJava.java  |  12 +-
 .../test/processor/TestStreamProcessor.java     |   2 +-
 .../test/integration/StreamTaskTestUtil.scala   |   3 +-
 24 files changed, 1259 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java 
b/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
new file mode 100644
index 0000000..4565de6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/SamzaContainerStatus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza;
+
+
+/**
+ * <pre>
+ *                                                                   runloop 
completed [OR]
+ *                  container.run()           runloop.run            
container.shutdown()
+ *    NOT_STARTED -----------------> STARTING ------------> STARTED 
-----------------------> STOPPED
+ *                                       |      Error in runloop |
+ *                                       |      [OR] Error when  |
+ *                           Error when  |      stopping         |
+ *                   starting components |      components       |
+ *                                       V                       |
+ *                                    FAILED <-------------------|
+ * </pre>
+ */
+
+/**
+ * Indicates the current status of a {@link 
org.apache.samza.container.SamzaContainer}
+ */
+public enum  SamzaContainerStatus {
+  /**
+   * Indicates that the container has not been started
+   */
+  NOT_STARTED,
+
+  /**
+   * Indicates that the container is starting all the components required by 
the
+   * {@link org.apache.samza.container.RunLoop} for processing
+   */
+  STARTING,
+
+  /**
+   * Indicates that the container started the {@link 
org.apache.samza.container.RunLoop}
+   */
+  STARTED,
+
+  /**
+   * Indicates that the container was successfully stopped either due to 
task-initiated shutdown
+   * (eg. end-of-stream triggered shutdown or application-driven shutdown of 
all tasks and hence, the container) or
+   * due to external shutdown requests (eg. from {@link 
org.apache.samza.processor.StreamProcessor})
+   */
+  STOPPED,
+
+  /**
+   * Indicates that the container failed during any of its 3 active states -
+   * {@link #STARTING}, {@link #STARTED}, {@link #STOPPED}
+   */
+  FAILED
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
 
b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
new file mode 100644
index 0000000..a9c3b2c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+/**
+ * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle 
events.
+ */
+public interface SamzaContainerListener {
+
+  /**
+   *  Method invoked when the {@link 
org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   *  the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is 
about to start the
+   *  {@link org.apache.samza.container.RunLoop}
+   */
+  void onContainerStart();
+
+  /**
+   *  Method invoked when the {@link 
org.apache.samza.container.SamzaContainer} has successfully transitioned to
+   *  {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on 
state transitions can be found in
+   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  <br>
+   *  <b>Note</b>: This will be the last call after completely shutting down 
the SamzaContainer without any
+   *  exceptions/errors.
+   * @param pausedByJm boolean indicating why the container was stopped. It 
should be {@literal true}, iff the container
+   *                    was stopped as a result of an expired {@link 
org.apache.samza.job.model.JobModel}. Otherwise,
+   *                    it should be {@literal false}
+   */
+  void onContainerStop(boolean pausedByJm);
+
+  /**
+   *  Method invoked when the {@link 
org.apache.samza.container.SamzaContainer} has  transitioned to
+   *  {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on 
state transitions can be found in
+   *  {@link org.apache.samza.SamzaContainerStatus}
+   *  <br>
+   *  <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive 
to {@link #onContainerStop(boolean)}.
+   * @param t Throwable that caused the container failure.
+   */
+  void onContainerFailed(Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index af2ef6a..bd06039 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -28,39 +28,41 @@ import org.apache.samza.job.model.JobModel;
  *  based on the underlying environment. In some cases, ID assignment is 
completely config driven, while in other
  *  cases, ID assignment may require coordination with JobCoordinators of 
other StreamProcessors.
  *
- *  This interface contains methods required for the StreamProcessor to 
interact with JobCoordinator.
+ *  StreamProcessor registers a {@link JobCoordinatorListener} in order to get 
notified about JobModel changes and
+ *  Coordinator state change.
+ *
+ * <pre>
+ *   {@code
+ *  *******************  start()                            
********************
+ *  *                 *----------------------------------->>*                  
*
+ *  *                 *         onNewJobModel    ************                  
*
+ *  *                 *<<------------------------* Job      *                  
*
+ *  *                 *     onJobModelExpired    * Co-      *                  
*
+ *  *                 *<<------------------------* ordinator*                  
*
+ *  * StreamProcessor *     onCoordinatorStop    * Listener *  JobCoordinator  
*
+ *  *                 *<<------------------------*          *                  
*
+ *  *                 *  onCoordinatorFailure    *          *                  
*
+ *  *                 *<<------------------------************                  
*
+ *  *                 *  stop()                             *                  
*
+ *  *                 *----------------------------------->>*                  
*
+ *  *******************                                     
********************
+ *  }
+ *  </pre>
  */
 @InterfaceStability.Evolving
 public interface JobCoordinator {
   /**
-   * Starts the JobCoordinator which involves one or more of the following:
-   * * LeaderElector Module initialization, if any
-   * * If leader, generate JobModel. Else, read JobModel
+   * Starts the JobCoordinator, which generally consists of participating in 
LeaderElection and listening for JobModel
+   * changes.
    */
   void start();
 
   /**
-   * Cleanly shutting down the JobCoordinator involves:
-   * * Shutting down the Container
-   * * Shutting down the LeaderElection module (TBD: details depending on 
leader or not)
+   * Stops the JobCoordinator and notifies the registered {@link 
JobCoordinatorListener}, if any
    */
   void stop();
 
   /**
-   * Waits for a specified amount of time for the JobCoordinator to fully 
start-up, which means it should be ready to
-   * process messages.
-   * In a Standalone use-case, it may be sufficient to wait for the container 
to start-up.
-   * In a ZK based Standalone use-case, it also includes registration with ZK, 
initialization of the
-   * leader elector module, container start-up etc.
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the JobCoordinator is started within the 
specified wait time and {@code false} if the
-   * waiting time elapsed
-   * @throws InterruptedException if the current thread is interrupted while 
waiting for the JobCoordinator to start-up
-   */
-  boolean awaitStart(long timeoutMs) throws InterruptedException;
-
-  /**
    * Returns the identifier assigned to the processor that is local to the 
instance of StreamProcessor.
    *
    * The semantics and format of the identifier returned should adhere to the 
specification defined in
@@ -71,6 +73,13 @@ public interface JobCoordinator {
   String getProcessorId();
 
   /**
+   * Registers a {@link JobCoordinatorListener} to receive notification on 
coordinator state changes and job model changes
+   *
+   * @param listener An instance of {@link JobCoordinatorListener}
+   */
+  void setListener(JobCoordinatorListener listener);
+
+  /**
    * Returns the current JobModel
    * The implementation of the JobCoordinator in the leader needs to know how 
to read the config and generate JobModel
    * In case of a non-leader, the JobCoordinator should simply fetch the 
jobmodel

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 7f7e1ed..784d48d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -20,17 +20,13 @@ package org.apache.samza.coordinator;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
-import org.apache.samza.processor.SamzaContainerController;
-
 
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * @param processorId {@link org.apache.samza.processor.StreamProcessor} id
+   * @param processorId Identifier for {@link 
org.apache.samza.processor.StreamProcessor} instance
    * @param config Configs relevant for the JobCoordinator TODO: Separate JC 
related configs into a "JobCoordinatorConfig"
-   * @param containerController Controller interface for starting and stopping 
container. In future, it may simply
-   *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(String processorId, Config config, 
SamzaContainerController containerController);
+  JobCoordinator getJobCoordinator(String processorId, Config config);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java
new file mode 100644
index 0000000..8e17032
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.job.model.JobModel;
+
+/**
+ * Listener interface that can be registered with a {@link 
org.apache.samza.coordinator.JobCoordinator} instance in order
+ * to receive notifications.
+ */
+public interface JobCoordinatorListener {
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} 
in the following scenarios:
+   * <ul>
+   *  <li>the existing {@link JobModel} is no longer valid due to either 
re-balancing </li>
+   *  <li>JobCoordinator is shutting down</li>
+   * </ul>
+   */
+  void onJobModelExpired();
+
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} 
when there is new {@link JobModel}
+   * available for use by the processor.
+   *
+   * @param processorId String, representing the identifier of {@link 
org.apache.samza.processor.StreamProcessor}
+   * @param jobModel Current {@link JobModel} containing a {@link 
org.apache.samza.job.model.ContainerModel} for the
+   *                 given processorId
+   */
+  // TODO: Can change interface to ContainerModel if 
maxChangelogStreamPartitions can be made a part of ContainerModel
+  void onNewJobModel(String processorId, JobModel jobModel);
+
+  /**
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} 
when it is shutting without any errors
+   */
+  void onCoordinatorStop();
+
+  /**
+   *
+   * Method invoked by a {@link org.apache.samza.coordinator.JobCoordinator} 
when it is shutting down with error.
+   * <b>Note</b>: This should be the last call after completely shutting down 
the JobCoordinator.
+   *
+   * @param t Throwable that was the cause of the JobCoordinator failure
+   */
+  void onCoordinatorFailure(Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
 
b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
deleted file mode 100644
index 4af413a..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.processor;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.samza.config.ClusterManagerConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.metrics.JmxServer;
-import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SamzaContainerController {
-  private static final Logger log = 
LoggerFactory.getLogger(SamzaContainerController.class);
-
-  private ExecutorService executorService;
-  private volatile SamzaContainer container;
-  private final Map<String, MetricsReporter> metricsReporterMap;
-  private final Object taskFactory;
-  private final long containerShutdownMs;
-  private final StreamProcessorLifecycleListener lifecycleListener;
-
-  // Internal Member Variables
-  private Future containerFuture;
-
-  /**
-   * Creates an instance of a controller for instantiating, starting and/or 
stopping {@link SamzaContainer}
-   * Requests to execute a container are submitted to the {@link 
ExecutorService}
-   *
-   * @param taskFactory         Factory that be used create instances of 
{@link org.apache.samza.task.StreamTask} or
-   *                            {@link org.apache.samza.task.AsyncStreamTask}
-   * @param containerShutdownMs How long the Samza container should wait for 
an orderly shutdown of task instances
-   * @param metricsReporterMap  Map of metric reporter name and {@link 
MetricsReporter} instance
-   * @param lifecycleListener {@link StreamProcessorLifecycleListener}
-   */
-  public SamzaContainerController(
-      Object taskFactory,
-      long containerShutdownMs,
-      Map<String, MetricsReporter> metricsReporterMap,
-      StreamProcessorLifecycleListener lifecycleListener) {
-    this.taskFactory = taskFactory;
-    this.metricsReporterMap = metricsReporterMap;
-    if (containerShutdownMs == -1) {
-      this.containerShutdownMs = TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS;
-    } else {
-      this.containerShutdownMs = containerShutdownMs;
-    }
-    // life cycle callbacks when shutdown and failure happens
-    this.lifecycleListener = lifecycleListener;
-  }
-
-  /**
-   * Instantiates a container and submits to the executor. This method does 
not actually wait for the container to
-   * fully start-up. For such a behavior, see {@link #awaitStart(long)}
-   * <p>
-   * <b>Note:</b> <i>This method does not stop a currently running container, 
if any. It is left up to the caller to
-   * ensure that the container has been stopped with stopContainer before 
invoking this method.</i>
-   *
-   * @param containerModel               {@link ContainerModel} instance to 
use for the current run of the Container
-   * @param config                       Complete configuration map used by 
the Samza job
-   * @param maxChangelogStreamPartitions Max number of partitions expected in 
the changelog streams
-   *                                     TODO: Try to get rid of 
maxChangelogStreamPartitions from method arguments
-   */
-  public void startContainer(ContainerModel containerModel, Config config, int 
maxChangelogStreamPartitions) {
-    LocalityManager localityManager = null;
-    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      localityManager = 
SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), 
config);
-    }
-    log.info("About to create container: " + containerModel.getProcessorId());
-    container = SamzaContainer$.MODULE$.apply(
-        containerModel.getProcessorId(),
-        containerModel,
-        config,
-        maxChangelogStreamPartitions,
-        localityManager,
-        new JmxServer(),
-        Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
-        taskFactory);
-    log.info("About to start container: " + containerModel.getProcessorId());
-    executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
-        .setNameFormat("p-" + containerModel.getProcessorId() + 
"-container-thread-%d").build());
-    containerFuture = executorService.submit(() -> {
-        try {
-          container.run();
-          lifecycleListener.onShutdown();
-        } catch (Throwable t) {
-          lifecycleListener.onFailure(t);
-        }
-      });
-  }
-
-  /**
-   * Method waits for a specified amount of time for the container to fully 
start-up, which consists of class-loading
-   * all the components and start message processing
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting
-   * time elapsed
-   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
-   */
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return container.awaitStart(timeoutMs);
-  }
-
-  /**
-   * Stops a running container, if any. Invoking this method multiple times 
does not have any side-effects.
-   */
-  public void stopContainer() {
-    if (container == null) {
-      log.warn("Shutdown before a container was created.");
-      return;
-    }
-
-    container.shutdown();
-    try {
-      if (containerFuture != null)
-        containerFuture.get(containerShutdownMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException | ExecutionException e) {
-      log.error("Ran into problems while trying to stop the container in the 
processor!", e);
-    } catch (TimeoutException e) {
-      log.warn("Got Timeout Exception while trying to stop the container in 
the processor! The processor may not shutdown properly", e);
-    }
-  }
-
-  /**
-   * Shutsdown the controller by first stop any running container and then, 
shutting down the {@link ExecutorService}
-   */
-  public void shutdown() {
-    stopContainer();
-    if (executorService != null) {
-      executorService.shutdown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 1910594..6329f6c 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -18,47 +18,64 @@
  */
 package org.apache.samza.processor;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.IllegalContainerStateException;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * StreamProcessor can be embedded in any application or executed in a 
distributed environment (aka cluster) as an
  * independent process.
  * <p>
- * <b>Usage Example:</b>
- * <pre>
- * StreamProcessor processor = new StreamProcessor(1, config);
- * processor.start();
- * try {
- *  boolean status = processor.awaitStart(TIMEOUT_MS);    // Optional - 
blocking call
- *  if (!status) {
- *    // Timed out
- *  }
- *  ...
- * } catch (InterruptedException ie) {
- *   ...
- * } finally {
- *   processor.stop();
- * }
- * </pre>
- * Note: A single JVM can create multiple StreamProcessor instances. It is 
safe to create StreamProcessor instances in
+ *
+ * <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It 
is safe to create StreamProcessor instances in
  * multiple threads.
  */
 @InterfaceStability.Evolving
 public class StreamProcessor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamProcessor.class);
+
   private final JobCoordinator jobCoordinator;
-  private final StreamProcessorLifecycleListener lifecycleListener;
-  private final String processorId;
+  private final StreamProcessorLifecycleListener processorListener;
+  private final Object taskFactory;
+  private final Map<String, MetricsReporter> customMetricsReporter;
+  private final Config config;
+  private final long taskShutdownMs;
+
+  private ExecutorService executorService;
+
+  private volatile SamzaContainer container = null;
+  // Latch used to synchronize between the JobCoordinator thread and the 
container thread, when the container is
+  // stopped due to re-balancing
+  private volatile CountDownLatch jcContainerShutdownLatch = new 
CountDownLatch(1);
+  private volatile boolean processorOnStartCalled = false;
+
+  @VisibleForTesting
+  JobCoordinatorListener jobCoordinatorListener = null;
 
   /**
    * Create an instance of StreamProcessor that encapsulates a JobCoordinator 
and Samza Container
@@ -70,84 +87,246 @@ public class StreamProcessor {
    * <p>
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the 
StreamProcessor, and NOT exposed to the user
    *
+   * @param processorId            String identifier for this processor
    * @param config                 Instance of config object - contains all 
configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that 
are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be 
used for creating task instances.
-   * @param lifecycleListener         listener to the StreamProcessor life 
cycle
+   * @param processorListener         listener to the StreamProcessor life 
cycle
    */
   public StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
-                         AsyncStreamTaskFactory asyncStreamTaskFactory, 
StreamProcessorLifecycleListener lifecycleListener) {
-    this(processorId, config, customMetricsReporters, (Object) 
asyncStreamTaskFactory, lifecycleListener);
+                         AsyncStreamTaskFactory asyncStreamTaskFactory, 
StreamProcessorLifecycleListener processorListener) {
+    this(processorId, config, customMetricsReporters, (Object) 
asyncStreamTaskFactory, processorListener);
   }
 
-
   /**
    *Same as {@link #StreamProcessor(String, Config, Map, 
AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
-   * @param lifecycleListener  listener to the StreamProcessor life cycle
+   * @param processorListener  listener to the StreamProcessor life cycle
    */
   public StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
-                         StreamTaskFactory streamTaskFactory, 
StreamProcessorLifecycleListener lifecycleListener) {
-    this(processorId, config, customMetricsReporters, (Object) 
streamTaskFactory, lifecycleListener);
+                         StreamTaskFactory streamTaskFactory, 
StreamProcessorLifecycleListener processorListener) {
+    this(processorId, config, customMetricsReporters, (Object) 
streamTaskFactory, processorListener);
   }
 
-  private StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
-                          Object taskFactory, StreamProcessorLifecycleListener 
lifecycleListener) {
-    this.processorId = processorId;
-
-    SamzaContainerController containerController = new 
SamzaContainerController(
-        taskFactory,
-        new TaskConfigJava(config).getShutdownMs(),
-        customMetricsReporters,
-        lifecycleListener);
-
-    this.jobCoordinator = Util.
+  /* package private */
+  JobCoordinator getJobCoordinator(String processorId) {
+    return Util.
         <JobCoordinatorFactory>getObj(
             new JobCoordinatorConfig(config)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, config, containerController);
+        .getJobCoordinator(processorId, config);
+  }
 
-    this.lifecycleListener = lifecycleListener;
+  @VisibleForTesting
+  StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, Object taskFactory,
+                  StreamProcessorLifecycleListener processorListener, 
JobCoordinator jobCoordinator) {
+    this.taskFactory = taskFactory;
+    this.config = config;
+    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+    this.customMetricsReporter = customMetricsReporters;
+    this.processorListener = processorListener;
+    this.jobCoordinator = jobCoordinator;
+    this.jobCoordinatorListener = createJobCoordinatorListener();
+    this.jobCoordinator.setListener(jobCoordinatorListener);
+  }
+
+  private StreamProcessor(String processorId, Config config, Map<String, 
MetricsReporter> customMetricsReporters,
+                          Object taskFactory, StreamProcessorLifecycleListener 
processorListener) {
+    this.taskFactory = taskFactory;
+    this.config = config;
+    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
+    this.customMetricsReporter = customMetricsReporters;
+    this.processorListener = processorListener;
+    this.jobCoordinator = getJobCoordinator(processorId);
+    this.jobCoordinator.setListener(createJobCoordinatorListener());
   }
 
   /**
-   * StreamProcessor Lifecycle: start()
-   * <ul>
-   * <li>Starts the JobCoordinator and fetches the JobModel</li>
-   * <li>jobCoordinator.start returns after starting the container using 
ContainerModel </li>
-   * </ul>
-   * When start() returns, it only guarantees that the container is 
initialized and submitted by the controller to
-   * execute
+   * Asynchronously starts this {@link StreamProcessor}.
+   * <p>
+   *   <b>Implementation</b>:
+   *   Starts the {@link JobCoordinator}, which will eventually start the 
{@link SamzaContainer} when a new
+   *   {@link JobModel} is available.
+   * </p>
    */
   public void start() {
     jobCoordinator.start();
-    lifecycleListener.onStart();
   }
 
   /**
-   * Method that allows the user to wait for a specified amount of time for 
the container to initialize and start
-   * processing messages
+   * <p>
+   * Asynchronously stops the {@link StreamProcessor}'s running components - 
{@link SamzaContainer}
+   * and {@link JobCoordinator}
+   * </p>
+   * There are multiple ways in which the StreamProcessor stops:
+   * <ol>
+   *   <li>Caller of StreamProcessor invokes stop()</li>
+   *   <li>Samza Container completes processing (eg. bounded input) and shuts 
down</li>
+   *   <li>Samza Container fails</li>
+   *   <li>Job Coordinator fails</li>
+   * </ol>
+   * When either container or coordinator stops (cleanly or due to exception), 
it will try to shutdown the
+   * StreamProcessor. This needs to be synchronized so that only one code path 
gets triggered for shutdown.
+   * <br>
+   * If container is running,
+   * <ol>
+   *   <li>container is shutdown cleanly and {@link 
SamzaContainerListener#onContainerStop(boolean)} will trigger
+   *   {@link JobCoordinator#stop()}</li>
+   *   <li>container fails to shutdown cleanly and {@link 
SamzaContainerListener#onContainerFailed(Throwable)} will
+   *   trigger {@link JobCoordinator#stop()}</li>
+   * </ol>
+   * If container is not running, then this method will simply shutdown the 
{@link JobCoordinator}.
    *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   * @return {@code true}, if the container started within the specified wait 
time and {@code false} if the waiting time
-   * elapsed
-   * @throws InterruptedException if the current thread is interrupted while 
waiting for container to start-up
    */
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return jobCoordinator.awaitStart(timeoutMs);
+  public synchronized void stop() {
+    boolean containerShutdownInvoked = false;
+    if (container != null) {
+      try {
+        LOGGER.info("Shutting down container " + container.toString() + " from 
StreamProcessor");
+        container.shutdown();
+        containerShutdownInvoked = true;
+      } catch (IllegalContainerStateException icse) {
+        LOGGER.info("Container was not running", icse);
+      }
+    }
+
+    if (!containerShutdownInvoked) {
+      LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+      jobCoordinator.stop();
+    }
+
   }
 
-  /**
-   * StreamProcessor Lifecycle: stop()
-   * <ul>
-   * <li>Stops the SamzaContainer execution</li>
-   * <li>Stops the JobCoordinator</li>
-   * </ul>
-   */
-  public void stop() {
-    jobCoordinator.stop();
+  SamzaContainer createSamzaContainer(ContainerModel containerModel, int 
maxChangelogStreamPartitions, JmxServer jmxServer) {
+    return SamzaContainer.apply(
+        containerModel,
+        config,
+        maxChangelogStreamPartitions,
+        jmxServer,
+        Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
+        taskFactory);
+  }
+
+  JobCoordinatorListener createJobCoordinatorListener() {
+    return new JobCoordinatorListener() {
+
+      @Override
+      public void onJobModelExpired() {
+        if (container != null) {
+          SamzaContainerStatus status = container.getStatus();
+          if (SamzaContainerStatus.NOT_STARTED.equals(status) || 
SamzaContainerStatus.STARTED.equals(status)) {
+            boolean shutdownComplete = false;
+            try {
+              LOGGER.info("Shutting down container in onJobModelExpired.");
+              container.pause();
+              shutdownComplete = 
jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+            } catch (IllegalContainerStateException icse) {
+              // Ignored since container is not running
+              LOGGER.info("Container was not running.", icse);
+              shutdownComplete = true;
+            } catch (InterruptedException e) {
+              LOGGER.warn("Container shutdown was interrupted!" + 
container.toString(), e);
+            }
+            if (!shutdownComplete) {
+              LOGGER.warn("Container " + container.toString() + " may not have 
shutdown successfully. " +
+                  "Stopping the processor.");
+              container = null;
+              stop();
+            } else {
+              LOGGER.debug("Container " + container.toString() + " shutdown 
successfully");
+            }
+          } else {
+            LOGGER.debug("Container " + container.toString() + " is not 
running.");
+          }
+        } else {
+          LOGGER.debug("Container is not instantiated yet.");
+        }
+      }
+
+      @Override
+      public void onNewJobModel(String processorId, JobModel jobModel) {
+        if (!jobModel.getContainers().containsKey(processorId)) {
+          LOGGER.warn("JobModel does not contain the processorId: " + 
processorId + ". Stopping the processor.");
+          stop();
+        } else {
+          jcContainerShutdownLatch = new CountDownLatch(1);
+
+          SamzaContainerListener containerListener = new 
SamzaContainerListener() {
+            @Override
+            public void onContainerStart() {
+              if (!processorOnStartCalled) {
+                // processorListener is called on start only the first time 
the container starts.
+                // It is not called after every re-balance of partitions among 
the processors
+                processorOnStartCalled = true;
+                if (processorListener != null) {
+                  processorListener.onStart();
+                }
+              } else {
+                LOGGER.debug("StreamProcessorListener was notified of 
container start previously. Hence, skipping this time.");
+              }
+            }
+
+            @Override
+            public void onContainerStop(boolean pauseByJm) {
+              if (pauseByJm) {
+                LOGGER.info("Container " + container.toString() + " stopped 
due to a request from JobCoordinator.");
+                if (jcContainerShutdownLatch != null) {
+                  jcContainerShutdownLatch.countDown();
+                }
+              } else {  // sp.stop was called or container stopped by itself
+                LOGGER.info("Container " + container.toString() + " stopped.");
+                container = null; // this guarantees that stop() doesn't try 
to stop container again
+                stop();
+              }
+            }
+
+            @Override
+            public void onContainerFailed(Throwable t) {
+              if (jcContainerShutdownLatch != null) {
+                jcContainerShutdownLatch.countDown();
+              } else {
+                LOGGER.warn("JobCoordinatorLatch was null. It is possible for 
some component to be waiting.");
+              }
+              LOGGER.error("Container failed. Stopping the processor.", t);
+              container = null;
+              stop();
+            }
+          };
+
+          container = createSamzaContainer(
+              jobModel.getContainers().get(processorId),
+              jobModel.maxChangeLogStreamPartitions,
+              new JmxServer());
+          container.setContainerListener(containerListener);
+          LOGGER.info("Starting container " + container.toString());
+          executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder()
+              .setNameFormat("p-" + processorId + 
"-container-thread-%d").build());
+          executorService.submit(container::run);
+        }
+      }
+
+      @Override
+      public void onCoordinatorStop() {
+        if (executorService != null) {
+          LOGGER.info("Shutting down the executor service.");
+          executorService.shutdownNow();
+        }
+        if (processorListener != null) {
+          processorListener.onShutdown();
+        }
+      }
+
+      @Override
+      public void onCoordinatorFailure(Throwable e) {
+        LOGGER.info("Coordinator Failed. Stopping the processor.");
+        stop();
+        if (processorListener != null) {
+          processorListener.onFailure(e);
+        }
+      }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
index 7bca074..6b8e3c7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
+++ 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
@@ -31,6 +31,9 @@ import org.apache.samza.annotation.InterfaceStability;
 public interface StreamProcessorLifecycleListener {
   /**
    * Callback when the {@link StreamProcessor} is started
+   * This callback is invoked only once when {@link 
org.apache.samza.container.SamzaContainer} starts for the first time
+   * in the {@link StreamProcessor}. When there is a re-balance of 
tasks/partitions among the processors, the container
+   * may temporarily be "paused" and re-started again. For such re-starts, 
this callback is NOT invoked.
    */
   void onStart();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 80350df..920cc3d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -33,6 +33,7 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaToJavaUtils;
 import org.apache.samza.util.Util;
@@ -55,6 +56,7 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
   private static final Logger log = 
LoggerFactory.getLogger(LocalContainerRunner.class);
   private final JobModel jobModel;
   private final String containerId;
+  private volatile Throwable containerException = null;
 
   public LocalContainerRunner(JobModel jobModel, String containerId) {
     super(jobModel.getConfig());
@@ -71,14 +73,30 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
       Object taskFactory = TaskFactoryUtil.createTaskFactory(config, 
streamApp, this);
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel.getProcessorId(),
           containerModel,
           config,
           jobModel.maxChangeLogStreamPartitions,
-          SamzaContainer.getLocalityManager(containerId, config),
           jmxServer,
           Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
           taskFactory);
+      container.setContainerListener(
+          new SamzaContainerListener() {
+            @Override
+            public void onContainerStart() {
+              log.info("Container Started");
+            }
+
+            @Override
+            public void onContainerStop(boolean invokedExternally) {
+              log.info("Container Stopped");
+            }
+
+            @Override
+            public void onContainerFailed(Throwable t) {
+              log.info("Container Failed");
+              containerException = t;
+            }
+          });
 
       container.run();
     } finally {
@@ -86,6 +104,10 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
         jmxServer.stop();
       }
     }
+    if (containerException != null) {
+      log.error("Container stopped with Exception. Exiting process now.", 
containerException);
+      System.exit(1);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index 0d74fb8..61ead18 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -18,18 +18,13 @@
  */
 package org.apache.samza.standalone;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.processor.SamzaContainerController;
-import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -38,6 +33,10 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Standalone Job Coordinator does not implement any leader elector module or 
cluster manager
  *
@@ -62,87 +61,79 @@ import org.slf4j.LoggerFactory;
  * </ul>
  * */
 public class StandaloneJobCoordinator implements JobCoordinator {
-  private static final Logger log = 
LoggerFactory.getLogger(StandaloneJobCoordinator.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StandaloneJobCoordinator.class);
   private final String processorId;
   private final Config config;
-  private final JobModel jobModel;
-  private final SamzaContainerController containerController;
+  private JobCoordinatorListener coordinatorListener = null;
 
-  @VisibleForTesting
-  StandaloneJobCoordinator(
-      ProcessorIdGenerator processorIdGenerator,
-      Config config,
-      SamzaContainerController containerController,
-      JobModel jobModel) {
-    this.processorId = processorIdGenerator.generateProcessorId(config);
-    this.config = config;
-    this.containerController = containerController;
-    this.jobModel = jobModel;
-  }
-
-  public StandaloneJobCoordinator(String processorId, Config config, 
SamzaContainerController containerController) {
-    this.config = config;
-    this.containerController = containerController;
+  public StandaloneJobCoordinator(String processorId, Config config) {
     this.processorId = processorId;
-
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = 
systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        log.error(String.format("A stream uses system %s, which is missing 
from the configuration.", systemName));
-        throw new SamzaException(String.format("A stream uses system %s, which 
is missing from the configuration.", systemName));
-      }
-      SystemFactory systemFactory = 
Util.<SystemFactory>getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, 
this.config));
-    }
-
-    StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 
5000, SystemClock.instance());
-
-    /** TODO:
-     * Locality Manager seems to be required in JC for reading locality info 
and grouping tasks intelligently and also,
-     * in SamzaContainer for writing locality info to the coordinator stream. 
This closely couples together
-     * TaskNameGrouper with the LocalityManager! Hence, groupers should be a 
property of the jobcoordinator
-     * (job.coordinator.task.grouper, instead of 
task.systemstreampartition.grouper)
-     */
-    this.jobModel = JobModelManager.readJobModel(this.config, 
Collections.emptyMap(), null, streamMetadataCache, null);
+    this.config = config;
   }
 
   @Override
   public void start() {
     // No-op
-    JobModel jobModel = getJobModel();
-    containerController.startContainer(
-        jobModel.getContainers().get(getProcessorId()),
-        jobModel.getConfig(),
-        jobModel.maxChangeLogStreamPartitions);
+    JobModel jobModel = null;
+    try {
+      jobModel = getJobModel();
+    } catch (Exception e) {
+      LOGGER.error("Exception while trying to getJobModel.", e);
+      if (coordinatorListener != null) {
+        coordinatorListener.onCoordinatorFailure(e);
+      }
+    }
+    if (jobModel != null && jobModel.getContainers().containsKey(processorId)) 
{
+      if (coordinatorListener != null) {
+        coordinatorListener.onNewJobModel(processorId, jobModel);
+      }
+    } else {
+      stop();
+    }
   }
 
   @Override
   public void stop() {
     // No-op
-    containerController.shutdown();
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+      coordinatorListener.onCoordinatorStop();
+    }
   }
 
-  /**
-   * Waits for a specified amount of time for the JobCoordinator to fully 
start-up, which means it should be ready to
-   * process messages. In a Standalone use-case, it may be sufficient to wait 
for the container to start-up. In case of
-   * ZK based Standalone use-case, it also includes registration with ZK, the 
initialization of leader elector module etc.
-   *
-   * @param timeoutMs Maximum time to wait, in milliseconds
-   */
   @Override
-  public boolean awaitStart(long timeoutMs) throws InterruptedException {
-    return containerController.awaitStart(timeoutMs);
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
   }
 
   @Override
   public JobModel getJobModel() {
-    return jobModel;
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = 
systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        LOGGER.error(String.format("A stream uses system %s, which is missing 
from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which 
is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = 
Util.<SystemFactory>getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, 
this.config));
+    }
+
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
+        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, 
SystemClock.instance());
+
+    /** TODO:
+     Locality Manager seems to be required in JC for reading locality info and 
grouping tasks intelligently and also,
+     in SamzaContainer for writing locality info to the coordinator stream. 
This closely couples together
+     TaskNameGrouper with the LocalityManager! Hence, groupers should be a 
property of the jobcoordinator
+     (job.coordinator.task.grouper, instead of 
task.systemstreampartition.grouper)
+     */
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), 
null, streamMetadataCache, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 0faeca9..8c27ebe 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -21,11 +21,10 @@ package org.apache.samza.standalone;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory 
{
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, 
SamzaContainerController containerController) {
-    return new StandaloneJobCoordinator(processorId, config, 
containerController);
+  public JobCoordinator getJobCoordinator(String processorId, Config config) {
+    return new StandaloneJobCoordinator(processorId, config);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 0afd840..20de43c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -119,7 +119,6 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
 
   @Override
   public void waitForBarrier(String version, String participantName, Runnable 
callback) {
-
     setPaths(version);
     final String barrierProcessorThis = String.format("%s/%s", 
barrierProcessors, participantName);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 61f7876..b6e3aed 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -58,24 +58,17 @@ public class ZkControllerImpl implements ZkController {
             keyBuilder.getJobModelPathPrefix()});
   }
 
-  private void onBecomeLeader() {
-
-    listenToProcessorLiveness(); // subscribe for adding new processors
-
-    // inform the caller
-    zkControllerListener.onBecomeLeader();
-
-  }
-
   @Override
   public void register() {
-
     // TODO - make a loop here with some number of attempts.
     // possibly split into two method - becomeLeader() and becomeParticipant()
     leaderElector.tryBecomeLeader(new LeaderElectorListener() {
       @Override
       public void onBecomingLeader() {
-        onBecomeLeader();
+        listenToProcessorLiveness();
+
+        // inform the caller
+        zkControllerListener.onBecomeLeader();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 1ddedbc..d2d0199 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -35,7 +35,7 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
@@ -53,23 +53,19 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   private final ZkUtils zkUtils;
   private final String processorId;
-
   private final ZkController zkController;
-  private final SamzaContainerController containerController;
   private final ScheduleAfterDebounceTime debounceTimer;
   private final StreamMetadataCache  streamMetadataCache;
-  private final ZkKeyBuilder keyBuilder;
   private final Config config;
   private final CoordinationUtils coordinationUtils;
 
+  private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
- 
-  public ZkJobCoordinator(String processorId, Config config, 
ScheduleAfterDebounceTime debounceTimer,
-                          SamzaContainerController containerController) {
+
+  public ZkJobCoordinator(String processorId, Config config, 
ScheduleAfterDebounceTime debounceTimer) {
+    this.processorId = processorId;
     this.debounceTimer = debounceTimer;
-    this.containerController = containerController;
     this.config = config;
-    this.processorId = processorId;
 
     this.coordinationUtils = Util.
         <CoordinationServiceFactory>getObj(
@@ -78,7 +74,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
         .getCoordinationService(new 
ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), 
config);
 
     this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
-    this.keyBuilder = zkUtils.getKeyBuilder();
     this.zkController = new ZkControllerImpl(processorId, zkUtils, 
debounceTimer, this);
 
     streamMetadataCache = getStreamMetadataCache();
@@ -109,20 +104,23 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
 
   @Override
   public void stop() {
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
     zkController.stop();
-    if (containerController != null)
-      containerController.stopContainer();
+    if (coordinatorListener != null) {
+      coordinatorListener.onCoordinatorStop();
+    }
   }
 
   @Override
-  public boolean awaitStart(long timeoutMs)
-      throws InterruptedException {
-    return containerController.awaitStart(timeoutMs);
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override
-  public String getProcessorId() {
-    return processorId;
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
   }
 
   @Override
@@ -147,13 +145,18 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     log.info("ZkJobCoordinator::onProcessorChange - list of processors 
changed! List size=" + processors.size());
     // if list of processors is empty - it means we are called from 
'onBecomeLeader'
     generateNewJobModel(processors);
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
   }
 
   @Override
   public void onNewJobModelAvailable(final String version) {
     log.info("pid=" + processorId + "new JobModel available");
     // stop current work
-    containerController.stopContainer();
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
     log.info("pid=" + processorId + "new JobModel available.Container 
stopped.");
     // get the new job model
     newJobModel = zkUtils.getJobModel(version);
@@ -179,8 +182,9 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     log.info("pid=" + processorId + "got the new job model in 
JobModelConfirmed =" + jobModel);
 
     // start the container with the new model
-    
containerController.startContainer(jobModel.getContainers().get(processorId), 
jobModel.getConfig(),
-        jobModel.maxChangeLogStreamPartitions);
+    if (coordinatorListener != null) {
+      coordinatorListener.onNewJobModel(processorId, jobModel);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index a44565c..a7239eb 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -22,7 +22,6 @@ package org.apache.samza.zk;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
-import org.apache.samza.processor.SamzaContainerController;
 
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
@@ -30,17 +29,15 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
    *
    * @param processorId - id of this processor
    * @param config - configs relevant for the JobCoordinator TODO: Separate JC 
related configs into a "JobCoordinatorConfig"
-   * @param containerController - controller to allow JobCoordinator control 
the SamzaContainer.
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(String processorId, Config config, 
SamzaContainerController containerController) {
+  public JobCoordinator getJobCoordinator(String processorId, Config config) {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
 
     return new ZkJobCoordinator(
         processorId,
         config,
-        debounceTimer,
-        containerController);
+        debounceTimer);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 8481c92..c7b2b7c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -22,15 +22,15 @@ package org.apache.samza.container
 import java.io.File
 import java.nio.file.Path
 import java.util
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 import java.net.{URL, UnknownHostException}
 
-import org.apache.samza.SamzaException
+import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.{Config, ShellCommandConfig, StorageConfig}
+import org.apache.samza.config.{ClusterManagerConfig, Config, 
ShellCommandConfig, StorageConfig}
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -48,7 +48,6 @@ import org.apache.samza.metrics.JmxServer
 import org.apache.samza.metrics.JvmMetrics
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.runtime.ApplicationRunner
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.serializers.model.SamzaObjectMapper
@@ -81,8 +80,7 @@ object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
   val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
-  def getLocalityManager(containerId: String, config: Config): LocalityManager 
= {
-    val containerName = getSamzaContainerName(containerId)
+  def getLocalityManager(containerName: String, config: Config): 
LocalityManager = {
     val registryMap = new MetricsRegistryMap(containerName)
     val coordinatorSystemProducer =
       new CoordinatorStreamSystemFactory()
@@ -108,20 +106,21 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
-  def getSamzaContainerName(containerId: String): String = {
-    "samza-container-%s" format containerId
-  }
-
   def apply(
-    containerId: String,
     containerModel: ContainerModel,
     config: Config,
     maxChangeLogStreamPartitions: Int,
-    localityManager: LocalityManager,
     jmxServer: JmxServer,
     customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter](),
     taskFactory: Object) = {
-    val containerName = getSamzaContainerName(containerId)
+    val containerId = containerModel.getProcessorId()
+    val containerName = "samza-container-%s" format containerId
+
+    var localityManager: LocalityManager = null
+    if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
+      localityManager = getLocalityManager(containerName, config)
+    }
+
     val containerPID = Util.getContainerPID
 
     info("Setting up Samza container: %s" format containerName)
@@ -627,23 +626,25 @@ class SamzaContainer(
   taskThreadPool: ExecutorService = null) extends Runnable with Logging {
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
-  private val runLoopStartLatch: CountDownLatch = new CountDownLatch(1)
   var shutdownHookThread: Thread = null
 
-  def awaitStart(timeoutMs: Long): Boolean = {
-    try {
-      runLoopStartLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
-    } catch {
-      case ie: InterruptedException =>
-        error("Interrupted while waiting for runloop to start!", ie)
-        throw ie
-    }
+  @volatile private var status = SamzaContainerStatus.NOT_STARTED
+  private var exceptionSeen: Throwable = null
+  private var paused: Boolean = false
+  private var containerListener: SamzaContainerListener = null
+
+  def getStatus(): SamzaContainerStatus = status
+
+  def setContainerListener(listener: SamzaContainerListener): Unit = {
+    containerListener = listener
   }
 
   def run {
     try {
       info("Starting container.")
 
+      status = SamzaContainerStatus.STARTING
+
       startMetrics
       startOffsetManager
       startLocalityManager
@@ -656,16 +657,24 @@ class SamzaContainer(
       startSecurityManger
 
       addShutdownHook
-      runLoopStartLatch.countDown()
       info("Entering run loop.")
+      status = SamzaContainerStatus.STARTED
+      if (containerListener != null) {
+        containerListener.onContainerStart()
+      }
       runLoop.run
     } catch {
       case e: Throwable =>
-        error("Caught exception/error in process loop.", e)
-        throw e
-    } finally {
+        if (status.equals(SamzaContainerStatus.STARTED)) {
+          error("Caught exception/error in run loop.", e)
+        } else {
+          error("Caught exception/error while initializing container.", e)
+        }
+        status = SamzaContainerStatus.FAILED
+        exceptionSeen = e
+    }
+    try {
       info("Shutting down.")
-
       removeShutdownHook
 
       shutdownConsumers
@@ -679,11 +688,64 @@ class SamzaContainer(
       shutdownMetrics
       shutdownSecurityManger
 
+      if (!status.equals(SamzaContainerStatus.FAILED)) {
+        status = SamzaContainerStatus.STOPPED
+      }
+
       info("Shutdown complete.")
+    } catch {
+      case e: Throwable =>
+        error("Caught exception/error while shutting down container.", e)
+        if (exceptionSeen == null) {
+          exceptionSeen = e
+        }
+        status = SamzaContainerStatus.FAILED
+    }
+
+    status match {
+      case SamzaContainerStatus.STOPPED =>
+        if (containerListener != null) {
+          containerListener.onContainerStop(paused)
+        }
+      case SamzaContainerStatus.FAILED =>
+        if (containerListener != null) {
+          containerListener.onContainerFailed(exceptionSeen)
+        }
     }
   }
 
-  def shutdown() = {
+  // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the 
future so that StreamProcessor can pause and
+  // unpause the container when the jobmodel changes.
+  /**
+   * Marks the [[SamzaContainer]] as being paused by the called due to a 
change in [[JobModel]] and then, asynchronously
+   * shuts down this [[SamzaContainer]]
+   */
+  def pause(): Unit = {
+    paused = true
+    shutdown()
+  }
+
+  /**
+   * <p>
+   *   Asynchronously shuts down this [[SamzaContainer]]
+   * </p>
+   * <br>
+   * <b>Implementation</b>: Stops the [[RunLoop]], which will eventually 
transition the container from
+   * [[SamzaContainerStatus.STARTED]] to either 
[[SamzaContainerStatus.STOPPED]] or [[SamzaContainerStatus.FAILED]]].
+   * Based on the final `status`, 
[[SamzaContainerListener#onContainerStop(boolean)]] or
+   * [[SamzaContainerListener#onContainerFailed(Throwable)]] will be invoked 
respectively.
+   *
+   * @throws SamzaException, Thrown when the container has already been 
stopped or failed
+   */
+  def shutdown(): Unit = {
+    if (status == SamzaContainerStatus.STOPPED || status == 
SamzaContainerStatus.FAILED) {
+      throw new IllegalContainerStateException("Cannot shutdown a container 
with status - " + status)
+    }
+    shutdownRunLoop()
+  }
+
+  // Shutdown Runloop
+  def shutdownRunLoop() = {
     runLoop match {
       case runLoop: RunLoop => runLoop.shutdown
       case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
@@ -809,10 +871,7 @@ class SamzaContainer(
     shutdownHookThread = new Thread("CONTAINER-SHUTDOWN-HOOK") {
       override def run() = {
         info("Shutting down, will wait up to %s ms" format shutdownMs)
-        runLoop match {
-          case runLoop: RunLoop => runLoop.shutdown
-          case asyncRunLoop: AsyncRunLoop => asyncRunLoop.shutdown()
-        }
+        shutdownRunLoop()  //TODO: Pull out shutdown hook to 
LocalContainerRunner or SP
         try {
           runLoopThread.join(shutdownMs)
         } catch {
@@ -923,3 +982,14 @@ class SamzaContainer(
     }
   }
 }
+
+/**
+ * Exception thrown when the SamzaContainer tries to transition to an illegal 
state.
+ * {@link SamzaContainerStatus} has more details on the state transitions.
+ *
+ * @param s String, Message associated with the exception
+ * @param t Throwable, Wrapped error/exception thrown, if any.
+ */
+class IllegalContainerStateException(s: String, t: Throwable) extends 
SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index e0522b1..a61a297 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,15 +19,9 @@
 
 package org.apache.samza.job.local
 
-import java.lang.Thread.UncaughtExceptionHandler
-
+import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, 
UnsuccessfulFinish}
+import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.util.Logging
-import org.apache.samza.job.StreamJob
-import org.apache.samza.job.ApplicationStatus
-import org.apache.samza.job.ApplicationStatus.New
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
-import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
 
 class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
   @volatile var jobStatus: Option[ApplicationStatus] = None

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index dcef3af..cb36863 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -20,19 +20,16 @@
 package org.apache.samza.job.local
 
 
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap}
-import org.apache.samza.runtime.LocalContainerRunner
-import org.apache.samza.task.TaskFactoryUtil
-import org.apache.samza.util.Logging
-import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
-import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.container.SamzaContainer
-import org.apache.samza.job.{ StreamJob, StreamJobFactory }
 import org.apache.samza.config.JobConfig._
+import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.container.{SamzaContainerListener, SamzaContainer}
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.job.{StreamJob, StreamJobFactory}
+import org.apache.samza.metrics.{JmxServer, MetricsReporter}
+import org.apache.samza.runtime.LocalContainerRunner
+import org.apache.samza.task.TaskFactoryUtil
+import org.apache.samza.util.Logging
 
 /**
  * Creates a new Thread job with the given config
@@ -54,18 +51,32 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
       case _ => None
     }
 
+    val containerListener = new SamzaContainerListener {
+      override def onContainerFailed(t: Throwable): Unit = {
+        error("Container failed.", t)
+        throw t
+      }
+
+      override def onContainerStop(pausedOrNot: Boolean): Unit = {
+      }
+
+      override def onContainerStart(): Unit = {
+
+      }
+    }
     try {
       coordinator.start
-      new ThreadJob(
-            SamzaContainer(
-              containerModel.getProcessorId,
-              containerModel,
-              config,
-              jobModel.maxChangeLogStreamPartitions,
-              null,
-              jmxServer,
-              Map[String, MetricsReporter](),
-              taskFactory))
+      val container = SamzaContainer(
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        jmxServer,
+        Map[String, MetricsReporter](),
+        taskFactory)
+      container.setContainerListener(containerListener)
+
+      val threadJob = new ThreadJob(container)
+      threadJob
     } finally {
       coordinator.stop
       jmxServer.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/475b4654/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
new file mode 100644
index 0000000..4a654dc
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.processor;
+
+import org.apache.samza.SamzaContainerStatus;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.JmxServer;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamProcessor {
+
+  class TestableStreamProcessor extends StreamProcessor {
+    private final CountDownLatch containerStop = new CountDownLatch(1);
+    private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
+    private SamzaContainer containerReference = null;
+
+    public TestableStreamProcessor(
+        Config config,
+        Map<String, MetricsReporter> customMetricsReporters,
+        StreamTaskFactory streamTaskFactory,
+        StreamProcessorLifecycleListener processorListener,
+        JobCoordinator jobCoordinator) {
+      super(config, customMetricsReporters, streamTaskFactory, 
processorListener, jobCoordinator);
+    }
+
+    @Override
+    SamzaContainer createSamzaContainer(
+        ContainerModel containerModel,
+        int maxChangelogStreamPartitions,
+        JmxServer jmxServer) {
+      RunLoop mockRunLoop = mock(RunLoop.class);
+      doAnswer(invocation ->
+        {
+          try {
+            runLoopStartForMain.countDown();
+            containerStop.await();
+          } catch (InterruptedException e) {
+            System.out.println("In exception" + e);
+            e.printStackTrace();
+          }
+          return null;
+        }).when(mockRunLoop).run();
+
+      doAnswer(invocation ->
+        {
+          containerStop.countDown();
+          return null;
+        }).when(mockRunLoop).shutdown();
+      containerReference = 
StreamProcessorTestUtils.getDummyContainer(mockRunLoop, null, 
mock(StreamTask.class));
+      return containerReference;
+    }
+  }
+
+  /**
+   * Tests stop() method when Container AND JobCoordinator are running
+   */
+  @Test
+  public void testStopByProcessor() {
+    JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+
+    final CountDownLatch processorListenerStop = new CountDownLatch(1);
+    final CountDownLatch processorListenerStart = new CountDownLatch(1);
+
+    TestableStreamProcessor processor = new TestableStreamProcessor(
+        new MapConfig(),
+        new HashMap<>(),
+        mock(StreamTaskFactory.class),
+        new StreamProcessorLifecycleListener() {
+          @Override
+          public void onStart() {
+            processorListenerStart.countDown();
+          }
+
+          @Override
+          public void onShutdown() {
+            processorListenerStop.countDown();
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+
+          }
+        },
+        mockJobCoordinator);
+
+    Map containers = mock(Map.class);
+    doReturn(true).when(containers).containsKey(anyString());
+    when(containers.get(anyString())).thenReturn(mock(ContainerModel.class));
+    JobModel mockJobModel = mock(JobModel.class);
+    when(mockJobModel.getContainers()).thenReturn(containers);
+
+    final CountDownLatch coordinatorStop = new CountDownLatch(1);
+    final Thread jcThread = new Thread(() ->
+      {
+        try {
+          processor.jobCoordinatorListener.onNewJobModel("1", mockJobModel);
+          coordinatorStop.await();
+          processor.jobCoordinatorListener.onCoordinatorStop();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      });
+
+    doAnswer(invocation ->
+      {
+        coordinatorStop.countDown();
+        return null;
+      }).when(mockJobCoordinator).stop();
+
+    doAnswer(invocation ->
+      {
+        jcThread.start();
+        return null;
+      }).when(mockJobCoordinator).start();
+
+    try {
+      processor.start();
+      processorListenerStart.await();
+
+      Assert.assertEquals(SamzaContainerStatus.STARTED, 
processor.containerReference.getStatus());
+
+      // This block is required for the mockRunloop is actually start.
+      // Otherwise, processor.stop gets triggered before mockRunloop begins to 
block
+      processor.runLoopStartForMain.await();
+
+      processor.stop();
+
+      processorListenerStop.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  // TODO:
+  // Test multiple start / stop and its ordering
+  // test onNewJobModel
+  // test onJobModelExpiry
+  // test Coordinator failure - correctly shutsdown the streamprocessor
+  // test Container failure
+}

Reply via email to