This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 450e105  SAMZA-2161: Move ChangelogPartitionManager and 
CoordinatorStream ConfigReader to MetadataStore. (#990)
450e105 is described below

commit 450e1050389a538f5152cbb7ee9a58668e7667d8
Author: shanthoosh <[email protected]>
AuthorDate: Fri Apr 12 15:38:12 2019 -0700

    SAMZA-2161: Move ChangelogPartitionManager and CoordinatorStream 
ConfigReader to MetadataStore. (#990)
---
 .../clustermanager/ClusterBasedJobCoordinator.java |  73 +++++++-------
 .../metadatastore/CoordinatorStreamStore.java      |   1 +
 .../stream/CoordinatorStreamValueSerde.java        |   2 +-
 .../stream/messages/SetChangelogMapping.java       |   7 +-
 .../samza/storage/ChangelogStreamManager.java      |  74 ++++++++------
 .../org/apache/samza/storage/StorageRecovery.java  |  49 +++++-----
 .../apache/samza/checkpoint/CheckpointTool.scala   | 107 ++++++++++++---------
 .../apache/samza/coordinator/JobModelManager.scala |  19 ++--
 .../apache/samza/job/local/ProcessJobFactory.scala |  99 ++++++++++---------
 .../apache/samza/job/local/ThreadJobFactory.scala  |  22 +++--
 .../apache/samza/util/CoordinatorStreamUtil.scala  |  21 ++++
 .../TestClusterBasedJobCoordinator.java            |   7 +-
 .../CoordinatorStreamStoreTestUtil.java            |  10 +-
 .../stream/MockCoordinatorStreamSystemFactory.java |   1 +
 .../MockCoordinatorStreamWrappedConsumer.java      |  14 +--
 .../samza/checkpoint/TestCheckpointTool.scala      |  16 +--
 .../samza/coordinator/TestJobCoordinator.scala     |  40 +++++---
 .../samza/validation/YarnJobValidationTool.java    |  46 +++++----
 .../job/yarn/TestSamzaYarnAppMasterService.scala   |  45 +++++----
 19 files changed, 365 insertions(+), 288 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index d64665c..75f6a4a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -41,7 +41,9 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.PartitionChangeException;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.StreamRegexMonitor;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -52,6 +54,7 @@ import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +87,7 @@ import scala.collection.JavaConverters;
  */
 public class ClusterBasedJobCoordinator {
 
-  private static final Logger log = 
LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClusterBasedJobCoordinator.class);
 
   private final Config config;
   private final ClusterManagerConfig clusterManagerConfig;
@@ -112,11 +115,6 @@ public class ClusterBasedJobCoordinator {
    */
   private final ChangelogStreamManager changelogStreamManager;
 
-  /**
-   * Single instance of the coordinator stream to use.
-   */
-  private final CoordinatorStreamManager coordinatorStreamManager;
-
   /*
    * The interval for polling the Task Manager for shutdown.
    */
@@ -151,6 +149,9 @@ public class ClusterBasedJobCoordinator {
    * Metrics to track stats around container failures, needed containers etc.
    */
   private final MetricsRegistryMap metrics;
+  private final CoordinatorStreamStore coordinatorStreamStore;
+
+  private final SystemAdmins systemAdmins;
 
   /**
    * Internal variable for the instance of {@link JmxServer}
@@ -162,8 +163,6 @@ public class ClusterBasedJobCoordinator {
    */
   volatile private Exception coordinatorException = null;
 
-  private SystemAdmins systemAdmins = null;
-
   /**
    * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke 
run() to actually
    * run the jobcoordinator.
@@ -172,22 +171,17 @@ public class ClusterBasedJobCoordinator {
    *                                {@link 
org.apache.samza.job.model.JobModel} from.
    */
   public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) {
-
     metrics = new MetricsRegistryMap();
 
-    coordinatorStreamManager = new 
CoordinatorStreamManager(coordinatorSystemConfig, metrics);
-    // register ClusterBasedJobCoordinator with the CoordinatorStreamManager.
-    coordinatorStreamManager.register(getClass().getSimpleName());
-    // start the coordinator stream's underlying consumer and producer.
-    coordinatorStreamManager.start();
-    // bootstrap current configuration.
-    coordinatorStreamManager.bootstrap();
+    coordinatorStreamStore = new 
CoordinatorStreamStore(coordinatorSystemConfig, metrics);
+    coordinatorStreamStore.init();
+    config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
 
     // build a JobModelManager and ChangelogStreamManager and perform 
partition assignments.
-    changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping(), metrics);
+    changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE));
+    jobModelManager = JobModelManager.apply(config, 
changelogStreamManager.readPartitionMapping(),
+                                            coordinatorStreamStore, metrics);
 
-    config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
     state = new SamzaApplicationState(jobModelManager);
     // The systemAdmins should be started before partitionMonitor can be used. 
And it should be stopped when this coordinator is stopped.
@@ -209,7 +203,7 @@ public class ClusterBasedJobCoordinator {
    */
   public void run() {
     if (!isStarted.compareAndSet(false, true)) {
-      log.warn("Attempting to start an already started job coordinator. ");
+      LOG.warn("Attempting to start an already started job coordinator. ");
       return;
     }
     // set up JmxServer (if jmx is enabled)
@@ -223,7 +217,7 @@ public class ClusterBasedJobCoordinator {
 
     try {
       //initialize JobCoordinator state
-      log.info("Starting cluster based job coordinator");
+      LOG.info("Starting cluster based job coordinator");
 
       //create necessary checkpoint and changelog streams, if not created
       JobModel jobModel = jobModelManager.jobModel();
@@ -258,12 +252,12 @@ public class ClusterBasedJobCoordinator {
           Thread.sleep(jobCoordinatorSleepInterval);
         } catch (InterruptedException e) {
           isInterrupted = true;
-          log.error("Interrupted in job coordinator loop", e);
+          LOG.error("Interrupted in job coordinator loop", e);
           Thread.currentThread().interrupt();
         }
       }
     } catch (Throwable e) {
-      log.error("Exception thrown in the JobCoordinator loop ", e);
+      LOG.error("Exception thrown in the JobCoordinator loop", e);
       throw new SamzaException(e);
     } finally {
       onShutDown();
@@ -287,18 +281,18 @@ public class ClusterBasedJobCoordinator {
       inputStreamRegexMonitor.ifPresent(StreamRegexMonitor::stop);
       systemAdmins.stop();
       containerProcessManager.stop();
-      coordinatorStreamManager.stop();
+      coordinatorStreamStore.close();
     } catch (Throwable e) {
-      log.error("Exception while stopping cluster based job coordinator", e);
+      LOG.error("Exception while stopping cluster based job coordinator", e);
     }
-    log.info("Stopped cluster based job coordinator");
+    LOG.info("Stopped cluster based job coordinator");
 
     if (jmxServer != null) {
       try {
         jmxServer.stop();
-        log.info("Stopped JMX server");
+        LOG.info("Stopped Jmx Server");
       } catch (Throwable e) {
-        log.error("Exception while stopping JMX server", e);
+        LOG.error("Exception while stopping jmx server", e);
       }
     }
   }
@@ -314,7 +308,7 @@ public class ClusterBasedJobCoordinator {
         new JobConfig(config).getMonitorPartitionChangeFrequency(), 
streamsChanged -> {
       // Fail the jobs with durable state store. Otherwise, application 
state.status remains UNDEFINED s.t. YARN job will be restarted
         if (hasDurableStores) {
-          log.error("Input topic partition count changed in a job with durable 
state. Failing the job. " +
+          LOG.error("Input topic partition count changed in a job with durable 
state. Failing the job. " +
               "Changed topics: {}", streamsChanged.toString());
           state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
         }
@@ -326,7 +320,7 @@ public class ClusterBasedJobCoordinator {
 
     // if input regex monitor is not enabled return empty
     if (new JobConfig(config).getMonitorRegexEnabled()) {
-      log.info("StreamRegexMonitor is disabled.");
+      LOG.info("StreamRegexMonitor is disabled.");
       return Optional.empty();
     }
 
@@ -341,7 +335,7 @@ public class ClusterBasedJobCoordinator {
 
     // if no rewriter is defined, there is nothing to monitor
     if (!rewritersList.isDefined()) {
-      log.info("No config rewriters are defined. No StreamRegexMonitor 
created.");
+      LOG.warn("No config rewriters are defined. No StreamRegexMonitor 
created.");
       return Optional.empty();
     }
 
@@ -352,7 +346,7 @@ public class ClusterBasedJobCoordinator {
 
     // if there are no regexes to monitor
     if (inputRegexesToMonitor.isEmpty()) {
-      log.info("No input regexes are defined. No StreamRegexMonitor created.");
+      LOG.info("No input regexes are defined. No StreamRegexMonitor created.");
       return Optional.empty();
     }
 
@@ -362,8 +356,8 @@ public class ClusterBasedJobCoordinator {
           public void onInputStreamsChanged(Set<SystemStream> initialInputSet, 
Set<SystemStream> newInputStreams,
               Map<String, Pattern> regexesMonitored) {
             if (hasDurableStores) {
-              log.error("New input system-streams discovered. Failing the job 
since it is stateful. " +
-                  "New input streams: {}, Existing input streams: {}", 
newInputStreams, inputStreamsToMonitor);
+              LOG.error("New input system-streams discovered. Failing the job. 
New input streams: {}" +
+                  " Existing input streams: {}", newInputStreams, 
inputStreamsToMonitor);
               state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
             }
             coordinatorException = new InputStreamsDiscoveredException("New 
input streams discovered: " + newInputStreams);
@@ -389,19 +383,20 @@ public class ClusterBasedJobCoordinator {
    * @param args args
    */
   public static void main(String[] args) {
-    Config coordinatorSystemConfig = null;
+    Config coordinatorSystemConfig;
     final String coordinatorSystemEnv = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
     try {
       //Read and parse the coordinator system config.
-      log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
+      LOG.info("Parsing coordinator system config {}", coordinatorSystemEnv);
       coordinatorSystemConfig =
           new 
MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, 
Config.class));
+      LOG.info("Using the coordinator system config: {}.", 
coordinatorSystemConfig);
     } catch (IOException e) {
-      log.error("Error reading coordinator stream config.", e);
+      LOG.error("Exception while reading coordinator stream config", e);
       throw new SamzaException(e);
     }
     ClusterBasedJobCoordinator jc = new 
ClusterBasedJobCoordinator(coordinatorSystemConfig);
     jc.run();
-    log.info("Finished running ClusterBasedJobCoordinator");
+    LOG.info("Finished running ClusterBasedJobCoordinator");
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 87121f1..4752cdc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -185,6 +185,7 @@ public class CoordinatorStreamStore implements 
MetadataStore {
       systemProducer.flush(SOURCE);
     } catch (Exception e) {
       LOG.error("Exception occurred when flushing the metadata store:", e);
+      throw new SamzaException("Exception occurred when flushing the metadata 
store:", e);
     }
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index b0d5815..9b862bd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -59,7 +59,7 @@ public class CoordinatorStreamValueSerde implements 
Serde<String> {
       return setTaskContainerMapping.getTaskAssignment();
     } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
-      return String.valueOf(changelogMapping.getPartition());
+      return (changelogMapping.getPartition() != null) ? 
String.valueOf(changelogMapping.getPartition()) : null;
     } else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
       SetConfig setConfig = new SetConfig(message);
       return setConfig.getConfigValue();
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
index 279ae06..912fe24 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.coordinator.stream.messages;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * The {@link SetChangelogMapping} message is used to store the changelog 
partition information for a particular task.
  * The structure looks like:
@@ -58,7 +60,8 @@ public class SetChangelogMapping extends 
CoordinatorStreamMessage {
     return getKey();
   }
 
-  public int getPartition() {
-    return Integer.parseInt(getMessageValue(CHANGELOG_VALUE_KEY));
+  public Integer getPartition() {
+    String changelogPartition = getMessageValue(CHANGELOG_VALUE_KEY);
+    return StringUtils.isNotBlank(changelogPartition) ? 
Integer.parseInt(changelogPartition) : null;
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 71635aa..4635856 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.storage;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -28,9 +29,9 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStream;
@@ -38,58 +39,73 @@ import org.apache.samza.util.StreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * The Changelog manager creates the changelog stream. If a coordinator stream 
manager is provided,
- * it can be used to read, write and update the changelog stream 
partition-to-task mapping.
+ * Responsible for creating the changelog stream. Used for reading, writing
+ * and updating the task to changelog stream partition association in metadata 
store.
  */
 public class ChangelogStreamManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogStreamManager.class);
-  // This is legacy for changelog. Need to investigate what happens if you use 
a different source name
-  private static final String SOURCE = "JobModelManager";
 
-  private final CoordinatorStreamManager coordinatorStreamManager;
+  private final MetadataStore metadataStore;
+  private final CoordinatorStreamValueSerde valueSerde;
 
   /**
-   * Construct changelog manager with a bootstrapped coordinator stream.
+   * Builds the ChangelogStreamManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   * Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   * and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   * to reuse it across different utility classes. Uses the {@link 
CoordinatorStreamValueSerde} to serialize
+   * messages before reading/writing into metadata store.
    *
-   * @param coordinatorStreamManager Coordinator stream manager.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write 
the container locality.
    */
-  public ChangelogStreamManager(CoordinatorStreamManager 
coordinatorStreamManager) {
-    this.coordinatorStreamManager = coordinatorStreamManager;
+  public ChangelogStreamManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetChangelogMapping.TYPE);
   }
 
   /**
-   * Read the taskName to partition mapping that is being maintained by this 
ChangelogManager
+   * Reads the taskName to changelog partition assignments from the {@link 
MetadataStore}.
+   *
    * @return TaskName to change LOG partition mapping, or an empty map if 
there were no messages.
    */
   public Map<TaskName, Integer> readPartitionMapping() {
     LOG.debug("Reading changelog partition information");
-    final HashMap<TaskName, Integer> changelogMapping = new HashMap<>();
-    for (CoordinatorStreamMessage coordinatorStreamMessage : 
coordinatorStreamManager.getBootstrappedStream(SetChangelogMapping.TYPE)) {
-      SetChangelogMapping changelogMapEntry = new 
SetChangelogMapping(coordinatorStreamMessage);
-      changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), 
changelogMapEntry.getPartition());
-      LOG.debug("TaskName: {} is mapped to {}", 
changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
-    }
+    final Map<TaskName, Integer> changelogMapping = new HashMap<>();
+    metadataStore.all().forEach((taskName, partitionIdAsBytes) -> {
+        String partitionId = valueSerde.fromBytes(partitionIdAsBytes);
+        LOG.debug("TaskName: {} is mapped to {}", taskName, partitionId);
+        if (StringUtils.isNotBlank(partitionId)) {
+          changelogMapping.put(new TaskName(taskName), 
Integer.valueOf(partitionId));
+        }
+      });
     return changelogMapping;
   }
 
   /**
-   * Write the taskName to partition mapping.
-   * @param changelogEntries The entries that needs to be written to the 
coordinator stream, the map takes the taskName
-   *                         and it's corresponding changelog partition.
+   * Writes the taskName to changelog partition assignments to the {@link 
MetadataStore}.
+   * @param changelogEntries a map of the taskName to the changelog partition 
to be written to
+   *                         metadata store.
    */
   public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
     LOG.debug("Updating changelog information with: ");
     for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
-      LOG.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), 
entry.getValue());
-      coordinatorStreamManager.send(new SetChangelogMapping(SOURCE, 
entry.getKey().getTaskName(), entry.getValue()));
+      Preconditions.checkNotNull(entry.getKey());
+      String taskName = entry.getKey().getTaskName();
+      if (entry.getValue() != null) {
+        String changeLogPartitionId = String.valueOf(entry.getValue());
+        LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
+        metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
+      } else {
+        LOG.debug("Deleting the TaskName: {}", taskName);
+        metadataStore.delete(taskName);
+      }
     }
   }
 
   /**
-   * Merge previous and new taskName to partition mapping and write it.
+   * Merges the previous and the new taskName to changelog partition mapping.
+   * Writes the merged taskName to partition mapping to {@link MetadataStore}.
    * @param prevChangelogEntries The previous map of taskName to changelog 
partition.
    * @param newChangelogEntries The new map of taskName to changelog partition.
    */
@@ -101,10 +117,10 @@ public class ChangelogStreamManager {
   }
 
   /**
-   * Utility method to create and validate changelog streams. The method is 
static because it does not require an
-   * instance of the {@link CoordinatorStreamManager}
-   * @param config Config with changelog info
-   * @param maxChangeLogStreamPartitions Maximum changelog stream partitions 
to create
+   * Creates and validates the changelog streams of a samza job.
+   *
+   * @param config the configuration with changelog info.
+   * @param maxChangeLogStreamPartitions the maximum number of changelog 
stream partitions to create.
    */
   public static void createChangelogStreams(Config config, int 
maxChangeLogStreamPartitions) {
     // Get changelog store config
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 79491d3..4a9ab26 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -33,7 +33,7 @@ import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.ContainerContextImpl;
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -46,6 +46,7 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.SystemClock;
@@ -61,17 +62,17 @@ import scala.Option;
  * from the job's config file.
  */
 public class StorageRecovery extends CommandLine {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageRecovery.class);
+
+  private final Config jobConfig;
+  private final File storeBaseDir;
+  private final SystemAdmins systemAdmins;
+  private final Map<String, SystemStream> changeLogSystemStreams = new 
HashMap<>();
+  private final Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories = new HashMap<>();
+  private final Map<String, ContainerStorageManager> containerStorageManagers 
= new HashMap<>();
 
-  private Config jobConfig;
   private int maxPartitionNumber = 0;
-  private File storeBaseDir = null;
-  private HashMap<String, SystemStream> changeLogSystemStreams = new 
HashMap<>();
-  private HashMap<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories = new HashMap<>();
   private Map<String, ContainerModel> containers = new HashMap<>();
-  private Map<String, ContainerStorageManager> containerStorageManagers = new 
HashMap<>();
-
-  private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
-  private SystemAdmins systemAdmins = null;
 
   /**
    * Construct the StorageRecovery
@@ -92,7 +93,7 @@ public class StorageRecovery extends CommandLine {
    * tasks.
    */
   private void setup() {
-    log.info("setting up the recovery...");
+    LOG.info("setting up the recovery...");
 
     getContainerModels();
     getChangeLogSystemStreamsAndStorageFactories();
@@ -106,7 +107,7 @@ public class StorageRecovery extends CommandLine {
   public void run() {
     setup();
 
-    log.info("start recovering...");
+    LOG.info("start recovering...");
 
     systemAdmins.start();
     this.containerStorageManagers.forEach((containerName, 
containerStorageManager) -> {
@@ -117,7 +118,7 @@ public class StorageRecovery extends CommandLine {
       });
     systemAdmins.stop();
 
-    log.info("successfully recovered in " + storeBaseDir.toString());
+    LOG.info("successfully recovered in " + storeBaseDir.toString());
   }
 
   /**
@@ -125,14 +126,18 @@ public class StorageRecovery extends CommandLine {
    */
   private void getContainerModels() {
     MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
-    CoordinatorStreamManager coordinatorStreamManager = new 
CoordinatorStreamManager(jobConfig, metricsRegistryMap);
-    coordinatorStreamManager.register(getClass().getSimpleName());
-    coordinatorStreamManager.start();
-    coordinatorStreamManager.bootstrap();
-    ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModel jobModel = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping(), metricsRegistryMap).jobModel();
-    containers = jobModel.getContainers();
-    coordinatorStreamManager.stop();
+    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(jobConfig, metricsRegistryMap);
+    coordinatorStreamStore.init();
+    try {
+      Config configFromCoordinatorStream = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+      ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamStore);
+      JobModelManager jobModelManager = 
JobModelManager.apply(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
+                                                              
coordinatorStreamStore, metricsRegistryMap);
+      JobModel jobModel = jobModelManager.jobModel();
+      containers = jobModel.getContainers();
+    } finally {
+      coordinatorStreamStore.close();
+    }
   }
 
   /**
@@ -143,12 +148,12 @@ public class StorageRecovery extends CommandLine {
     JavaStorageConfig config = new JavaStorageConfig(jobConfig);
     List<String> storeNames = config.getStoreNames();
 
-    log.info("Got store names: " + storeNames.toString());
+    LOG.info("Got store names: " + storeNames.toString());
 
     for (String storeName : storeNames) {
       String streamName = config.getChangelogStream(storeName);
 
-      log.info("stream name for " + storeName + " is " + streamName);
+      LOG.info("stream name for " + storeName + " is " + streamName);
 
       if (streamName != null) {
         changeLogSystemStreams.put(storeName, 
StreamUtil.getSystemStreamFromNames(streamName));
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 25f7a17..2284ce4 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -29,16 +29,17 @@ import 
org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.JobRunner.{info, warn, _}
+import org.apache.samza.job.JobRunner.info
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Logging, Util}
+import org.apache.samza.util.{CommandLine, CoordinatorStreamUtil, Logging, 
Util}
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 
 import scala.collection.JavaConverters._
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.execution.JobPlanner
 import org.apache.samza.storage.ChangelogStreamManager
 
@@ -126,8 +127,9 @@ object CheckpointTool {
   }
 
   def apply(config: Config, offsets: TaskNameToCheckpointMap): CheckpointTool 
= {
-    val coordinatorStreamManager = new CoordinatorStreamManager(config, new 
MetricsRegistryMap())
-    new CheckpointTool(offsets, coordinatorStreamManager)
+    val metadataStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
+    metadataStore.init()
+    new CheckpointTool(offsets, metadataStore, config)
   }
 
   def rewriteConfig(config: JobConfig): Config = {
@@ -158,61 +160,70 @@ object CheckpointTool {
   }
 }
 
-class CheckpointTool(newOffsets: TaskNameToCheckpointMap, 
coordinatorStreamManager: CoordinatorStreamManager) extends Logging {
+class CheckpointTool(newOffsets: TaskNameToCheckpointMap, 
coordinatorStreamStore: CoordinatorStreamStore, userDefinedConfig: Config) 
extends Logging {
 
   def run() {
-    // Read the configuration stored in the coordinator stream.
-    coordinatorStreamManager.register(getClass.getSimpleName)
-    coordinatorStreamManager.start()
-    coordinatorStreamManager.bootstrap()
-    val configFromCoordinatorStream: Config = 
coordinatorStreamManager.getConfig
+    val configFromCoordinatorStream: Config = 
getConfigFromCoordinatorStream(coordinatorStreamStore)
+
+    println("Configuration read from the coordinator stream")
+    println(configFromCoordinatorStream)
+
+    val combinedConfigMap: util.Map[String, String] = new util.HashMap[String, 
String]()
+    combinedConfigMap.putAll(configFromCoordinatorStream)
+    combinedConfigMap.putAll(userDefinedConfig)
+    val combinedConfig: Config = new MapConfig(combinedConfigMap)
 
     // Instantiate the checkpoint manager with coordinator stream 
configuration.
-    val checkpointManager: CheckpointManager = 
configFromCoordinatorStream.getCheckpointManagerFactory() match {
+    val checkpointManager: CheckpointManager = 
combinedConfig.getCheckpointManagerFactory() match {
       case Some(className) =>
         Util.getObj(className, classOf[CheckpointManagerFactory])
-          .getCheckpointManager(configFromCoordinatorStream, new 
MetricsRegistryMap)
+          .getCheckpointManager(combinedConfig, new MetricsRegistryMap)
       case _ =>
         throw new SamzaException("Configuration: task.checkpoint.factory is 
not defined.")
     }
-
-    // Find all the TaskNames that would be generated for this job config
-    val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
-    val jobModelManager = JobModelManager(configFromCoordinatorStream, 
changelogManager.readPartitionMapping())
-    val taskNames = jobModelManager
-      .jobModel
-      .getContainers
-      .values
-      .asScala
-      .flatMap(_.getTasks.asScala.keys)
-      .toSet
-
-    taskNames.foreach(checkpointManager.register)
-    checkpointManager.start()
-
-    val lastCheckpoints = taskNames.map(taskName => {
-      taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
-                                          .getOrElse(new Checkpoint(new 
java.util.HashMap[SystemStreamPartition, String]()))
-                                          .getOffsets
-                                          .asScala
-                                          .toMap
-    }).toMap
-
-    lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current 
checkpoint for task: "+ lcp._1))
-
-    if (newOffsets != null) {
-      newOffsets.foreach {
-        case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) 
=> {
-          logCheckpoint(taskName, offsets, "New offset to be written for task: 
" + taskName)
-          val checkpoint = new Checkpoint(offsets.asJava)
-          checkpointManager.writeCheckpoint(taskName, checkpoint)
-          info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+    try {
+      // Find all the TaskNames that would be generated for this job config
+      val changelogManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
+      val jobModelManager = JobModelManager(combinedConfig, 
changelogManager.readPartitionMapping(), coordinatorStreamStore, new 
MetricsRegistryMap())
+      val taskNames = jobModelManager
+        .jobModel
+        .getContainers
+        .values
+        .asScala
+        .flatMap(_.getTasks.asScala.keys)
+        .toSet
+
+      taskNames.foreach(checkpointManager.register)
+      checkpointManager.start()
+
+      val lastCheckpoints = taskNames.map(taskName => {
+        taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
+          .getOrElse(new Checkpoint(new 
java.util.HashMap[SystemStreamPartition, String]()))
+          .getOffsets
+          .asScala
+          .toMap
+      }).toMap
+
+      lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current 
checkpoint for task: "+ lcp._1))
+
+      if (newOffsets != null) {
+        newOffsets.foreach {
+          case (taskName: TaskName, offsets: Map[SystemStreamPartition, 
String]) => {
+            logCheckpoint(taskName, offsets, "New offset to be written for 
task: " + taskName)
+            val checkpoint = new Checkpoint(offsets.asJava)
+            checkpointManager.writeCheckpoint(taskName, checkpoint)
+            info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+          }
         }
       }
-    }
+    } finally {
+      checkpointManager.stop()
+      coordinatorStreamStore.close()
+   }
+  }
 
-    checkpointManager.stop()
-    coordinatorStreamManager.stop()
+  def getConfigFromCoordinatorStream(coordinatorStreamStore: 
CoordinatorStreamStore): Config = {
+    return 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
   }
 
   def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, 
String], prefix: String) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 230a62d..4ada2f5 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -30,22 +30,20 @@ import org.apache.samza.config.Config
 import org.apache.samza.container.grouper.stream.SSPGrouperProxy
 import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task._
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
 import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
-import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
-import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
-import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metadatastore.MetadataStore
-import org.apache.samza.metadatastore.MetadataStoreFactory
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.LocationId
@@ -81,10 +79,9 @@ object JobModelManager extends Logging {
    * @param metricsRegistry the registry for reporting metrics.
    * @return the instantiated {@see JobModelManager}.
    */
-  def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, 
Integer], metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): 
JobModelManager = {
-    val coordinatorStreamStoreFactory: MetadataStoreFactory = new 
CoordinatorStreamMetadataStoreFactory()
-    val coordinatorStreamStore: MetadataStore = 
coordinatorStreamStoreFactory.getMetadataStore(SOURCE, config, metricsRegistry)
-    coordinatorStreamStore.init()
+  def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, 
Integer],
+            coordinatorStreamStore: CoordinatorStreamStore,
+            metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): 
JobModelManager = {
 
     // Instantiate the respective metadata store util classes which uses the 
same coordinator metadata store.
     val localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE))
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index d16c294..40884d6 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -20,16 +20,19 @@
 package org.apache.samza.job.local
 
 import java.util
+
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, 
StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging, Util}
+
 import scala.collection.JavaConversions._
 
 /**
@@ -44,59 +47,63 @@ class ProcessJobFactory extends StreamJobFactory with 
Logging {
     }
 
     val metricsRegistry = new MetricsRegistryMap()
-    val coordinatorStreamManager = new CoordinatorStreamManager(config, 
metricsRegistry)
-    coordinatorStreamManager.register(getClass.getSimpleName)
-    coordinatorStreamManager.start
-    coordinatorStreamManager.bootstrap
-    val changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-
-    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, 
changelogStreamManager.readPartitionMapping(), metricsRegistry)
-    val jobModel = coordinator.jobModel
-
-    val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
-    for (containerModel <- jobModel.getContainers.values) {
-      for (taskModel <- containerModel.getTasks.values) {
-        taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
+    val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
+    coordinatorStreamStore.init()
+
+    try {
+      val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+
+      val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
+
+      val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
+      val jobModel = coordinator.jobModel
+
+      val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
+      for (containerModel <- jobModel.getContainers.values) {
+        for (taskModel <- containerModel.getTasks.values) {
+          taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
+        }
       }
-    }
 
-    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
-    coordinatorStreamManager.stop()
+      changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
-    //create necessary checkpoint and changelog streams
-    val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
-    if (checkpointManager != null) {
-      checkpointManager.createResources()
-    }
-    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
+      //create necessary checkpoint and changelog streams
+      val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+      if (checkpointManager != null) {
+        checkpointManager.createResources()
+      }
+      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
 
-    val containerModel = coordinator.jobModel.getContainers.get(0)
+      val containerModel = coordinator.jobModel.getContainers.get(0)
 
-    val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
-    info("Process job. using fwkPath = " + fwkPath)
+      val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
+      info("Process job. using fwkPath = " + fwkPath)
 
-    val commandBuilder = {
-      config.getCommandClass match {
-        case Some(cmdBuilderClassName) => {
-          // A command class was specified, so we need to use a process job to
-          // execute the command in its own process.
-          Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
-        }
-        case _ => {
-          info("Defaulting to ShellCommandBuilder")
-          new ShellCommandBuilder
+      val commandBuilder = {
+        config.getCommandClass match {
+          case Some(cmdBuilderClassName) => {
+            // A command class was specified, so we need to use a process job 
to
+            // execute the command in its own process.
+            Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
+          }
+          case _ => {
+            info("Defaulting to ShellCommandBuilder")
+            new ShellCommandBuilder
+          }
         }
       }
-    }
-    // JobCoordinator is stopped by ProcessJob when it exits
-    coordinator.start
+      // JobCoordinator is stopped by ProcessJob when it exits
+      coordinator.start
 
-    commandBuilder
-            .setConfig(config)
-            .setId("0")
-            .setUrl(coordinator.server.getUrl)
-            .setCommandPath(fwkPath)
+      commandBuilder
+        .setConfig(config)
+        .setId("0")
+        .setUrl(coordinator.server.getUrl)
+        .setCommandPath(fwkPath)
 
-    new ProcessJob(commandBuilder, coordinator)
+      new ProcessJob(commandBuilder, coordinator)
+    } finally {
+      coordinatorStreamStore.close()
+    }
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f505f22..d4b88d3 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -21,19 +21,21 @@ package org.apache.samza.job.local
 
 import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.context.{ExternalContext, JobContextImpl}
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
 import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -46,14 +48,14 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
     info("Creating a ThreadJob, which is only meant for debugging.")
 
     val metricsRegistry = new MetricsRegistryMap()
-    val coordinatorStreamManager = new CoordinatorStreamManager(config, 
metricsRegistry)
-    coordinatorStreamManager.register(getClass.getSimpleName)
-    coordinatorStreamManager.start
-    coordinatorStreamManager.bootstrap
-    val changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager)
+    val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
+    coordinatorStreamStore.init()
 
-    val coordinator = JobModelManager(coordinatorStreamManager.getConfig, 
changelogStreamManager.readPartitionMapping(), metricsRegistry)
+    val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
 
+    val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
+
+    val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
     val jobModel = coordinator.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = 
mutable.Map[TaskName, Integer]()
@@ -76,7 +78,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val containerId = "0"
     var jmxServer: JmxServer = null
     if (new JobConfig(config).getJMXEnabled) {
-      jmxServer = new JmxServer();
+      jmxServer = new JmxServer()
     }
 
     val appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), 
config)
@@ -130,10 +132,10 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
       threadJob
     } finally {
       coordinator.stop
-      coordinatorStreamManager.stop()
       if (jmxServer != null) {
         jmxServer.stop
       }
+      coordinatorStreamStore.close()
     }
   }
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 0d767c6..2fca36c 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -21,10 +21,14 @@
 
 package org.apache.samza.util
 
+import java.util
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
 import org.apache.samza.system.{SystemFactory, SystemStream}
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
+import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 import scala.collection.immutable.Map
@@ -91,4 +95,21 @@ object CoordinatorStreamUtil {
     (config.getName.getOrElse(throw new ConfigException("Missing required 
config: job.name")),
       config.getJobId)
   }
+
+  /**
+    * Reads and returns the complete configuration stored in the coordinator 
stream.
+    * @param coordinatorStreamStore an instance of the instantiated {@link 
CoordinatorStreamStore}.
+    * @return the configuration read from the coordinator stream.
+    */
+  def readConfigFromCoordinatorStream(coordinatorStreamStore: 
CoordinatorStreamStore): Config = {
+    val namespaceAwareCoordinatorStreamStore: 
NamespaceAwareCoordinatorStreamStore = new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE)
+    val configFromCoordinatorStream: util.Map[String, Array[Byte]] = 
namespaceAwareCoordinatorStreamStore.all
+    val configMap: util.Map[String, String] = new util.HashMap[String, String]
+    for ((key: String, valueAsBytes: Array[Byte]) <- 
configFromCoordinatorStream.asScala) {
+      val valueSerde: CoordinatorStreamValueSerde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE)
+      val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
+      configMap.put(key, valueAsString)
+    }
+    new MapConfig(configMap)
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index e2776b2..cb1e099 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
@@ -55,7 +56,7 @@ public class TestClusterBasedJobCoordinator {
   Map<String, String> configMap;
 
   @Before
-  public void setUp() throws NoSuchFieldException, NoSuchMethodException {
+  public void setUp() {
     configMap = new HashMap<>();
     configMap.put("job.name", "test-job");
     configMap.put("job.coordinator.system", "kafka");
@@ -81,6 +82,8 @@ public class TestClusterBasedJobCoordinator {
   @Test
   public void testPartitionCountMonitorWithDurableStates() {
     configMap.put("stores.mystore.changelog", "mychangelog");
+    configMap.put(JobConfig.JOB_CONTAINER_COUNT(), "1");
+    
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(new
 MapConfig(configMap));
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
@@ -99,6 +102,8 @@ public class TestClusterBasedJobCoordinator {
 
   @Test
   public void testPartitionCountMonitorWithoutDurableStates() {
+    configMap.put(JobConfig.JOB_CONTAINER_COUNT(), "1");
+    
when(CoordinatorStreamUtil.readConfigFromCoordinatorStream(anyObject())).thenReturn(new
 MapConfig(configMap));
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
index 9ec548d..d6f5e73 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
@@ -40,12 +40,16 @@ public class CoordinatorStreamStoreTestUtil {
   private final Config config;
 
   public CoordinatorStreamStoreTestUtil(Config config) {
+    this(config, "test-kafka");
+  }
+
+  public CoordinatorStreamStoreTestUtil(Config config, String systemName) {
     this.config = config;
     this.systemFactory = new MockCoordinatorStreamSystemFactory();
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
-    SystemConsumer systemConsumer = systemFactory.getConsumer("test-kafka", 
config, new NoOpMetricsRegistry());
-    SystemProducer systemProducer = systemFactory.getProducer("test-kafka", 
config, new NoOpMetricsRegistry());
-    SystemAdmin systemAdmin = systemFactory.getAdmin("test-kafka", config);
+    SystemConsumer systemConsumer = systemFactory.getConsumer(systemName, 
config, new NoOpMetricsRegistry());
+    SystemProducer systemProducer = systemFactory.getProducer(systemName, 
config, new NoOpMetricsRegistry());
+    SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
     this.coordinatorStreamStore = new CoordinatorStreamStore(config, 
systemProducer, systemConsumer, systemAdmin);
     this.coordinatorStreamStore.init();
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 7b7d41f..6035fc2 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -95,6 +95,7 @@ public class MockCoordinatorStreamSystemFactory implements 
SystemFactory {
     String streamName = 
CoordinatorStreamUtil.getCoordinatorStreamName(jobName, jobId);
     SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, streamName, new Partition(0));
     mockConsumer = new 
MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+    mockConsumer.register(systemStreamPartition, "0");
     return mockConsumer;
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index b50aa3c..136fe5e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -65,10 +65,6 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
     convertConfigToCoordinatorMessage(config);
   }
 
-  public void addMoreMessages(Config config) {
-    convertConfigToCoordinatorMessage(config);
-  }
-
   public void addMessageEnvelope(IncomingMessageEnvelope envelope) throws 
IOException, InterruptedException {
     put(systemStreamPartition, envelope);
     setIsAtHead(systemStreamPartition, true);
@@ -77,8 +73,8 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
   private void convertConfigToCoordinatorMessage(Config config) {
     try {
       for (Map.Entry<String, String> configPair : config.entrySet()) {
-        byte[] keyBytes = null;
-        byte[] messgeBytes = null;
+        byte[] keyBytes;
+        byte[] messgeBytes;
         if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
           String[] changelogInfo = configPair.getKey().split(":");
           String changeLogPartition = configPair.getValue();
@@ -111,11 +107,5 @@ public class MockCoordinatorStreamWrappedConsumer extends 
BlockingEnvelopeMap {
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  public CountDownLatch blockPool() {
-    blockpollFlag = true;
-    return blockConsumerPoll;
-  }
-
-
   public void stop() {}
 }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 93c7096..971e55f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -28,6 +28,7 @@ import org.apache.samza.container.TaskName
 import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
 import org.apache.samza.config._
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
CoordinatorStreamStoreTestUtil}
 import org.apache.samza.metrics.MetricsRegistry
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.SystemAdmin
@@ -44,9 +45,9 @@ import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 import org.apache.samza.execution.JobPlanner
+import org.mockito.Mockito
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = _
@@ -145,7 +146,6 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
   def 
testShouldInstantiateCheckpointManagerWithConfigurationFromCoordinatorStream(): 
Unit = {
     val offsetMap: TaskNameToCheckpointMap = Map(tn0 -> Map(new 
SystemStreamPartition("test", "foo", p0) -> "42"),
       tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
-    val coordinatorStreamManager = mock[CoordinatorStreamManager]
     val userDefinedConfig: MapConfig = new MapConfig(Map(
       ApplicationConfig.APP_NAME -> "test",
       JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
@@ -156,16 +156,20 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
       TaskConfig.GROUPER_FACTORY -> 
"org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
     ).asJava)
     val generatedConfigs: MapConfig = 
JobPlanner.generateSingleJobConfig(userDefinedConfig)
-    when(coordinatorStreamManager.getConfig).thenReturn(generatedConfigs)
+    val coordinatorStreamStoreTestUtil: CoordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config)
 
-    val checkpointTool: CheckpointTool = new CheckpointTool(offsetMap, 
coordinatorStreamManager)
+
+    val coordinatorStreamStore: CoordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore
+    val checkpointTool: CheckpointTool = Mockito.spy(new 
CheckpointTool(offsetMap, coordinatorStreamStore, generatedConfigs))
+    
Mockito.when(checkpointTool.getConfigFromCoordinatorStream(coordinatorStreamStore)).thenReturn(userDefinedConfig)
     checkpointTool.run()
 
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn0, new Checkpoint(Map(new 
SystemStreamPartition("test", "foo", p0) -> "42").asJava))
     verify(TestCheckpointTool.checkpointManager)
       .writeCheckpoint(tn1, new Checkpoint(Map(new 
SystemStreamPartition("test", "foo", p1) -> "43").asJava))
-    verify(coordinatorStreamManager).getConfig
-    assert(TestCheckpointTool.coordinatorConfig == generatedConfigs)
+
+    // Two configurations job.id, job.name are populated in the coordinator 
config by SamzaRuntime and it is not present in generated config.
+    
assert(generatedConfigs.entrySet().containsAll(TestCheckpointTool.coordinatorConfig.entrySet()))
   }
 }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 5b43840..3c4e4fe 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -25,7 +25,9 @@ import org.apache.samza.Partition
 import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
 import org.apache.samza.config._
 import org.apache.samza.container.{SamzaContainer, TaskName}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, 
MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import 
org.apache.samza.coordinator.stream.{MockCoordinatorStreamSystemFactory, 
MockCoordinatorStreamWrappedConsumer}
 import org.apache.samza.job.MockJobFactory
 import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
@@ -106,16 +108,20 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
 
     // Verify that the atomicReference is initialized
     assertNotNull(JobModelManager.jobModelRef.get())
-    assertEquals(expectedJobModel, JobModelManager.jobModelRef.get())
+    val expectedContainerModels = new util.TreeMap[String, 
ContainerModel](expectedJobModel.getContainers)
+    val actualContainerModels = new util.TreeMap[String, 
ContainerModel](JobModelManager.jobModelRef.get().getContainers)
+    assertEquals(expectedContainerModels, actualContainerModels)
 
     coordinator.start
-    assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
-    assertEquals(expectedJobModel, coordinator.jobModel)
+    val expectedConfig: Config = coordinator.jobModel.getConfig
+    val actualConfig: Config = new MapConfig(config.asJava)
+    assertTrue(expectedConfig.entrySet().containsAll(actualConfig.entrySet()))
+    assertEquals(expectedJobModel.getContainers, 
coordinator.jobModel.getContainers)
 
     val response = HttpUtil.read(coordinator.server.getUrl)
     // Verify that the JobServlet is serving the correct jobModel
     val jobModelFromCoordinatorUrl = 
SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
-    assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
+    assertEquals(expectedJobModel.getContainers, 
jobModelFromCoordinatorUrl.getContainers)
 
     coordinator.stop
   }
@@ -270,22 +276,24 @@ class TestJobCoordinator extends FlatSpec with 
PrivateMethodTester {
   }
 
   def getTestJobModelManager(config: MapConfig) = {
-    val coordinatorStreamManager = new CoordinatorStreamManager(config, new 
MetricsRegistryMap)
-    coordinatorStreamManager.register("TestJobCoordinator")
-    coordinatorStreamManager.start
-    coordinatorStreamManager.bootstrap
-    val changelogPartitionManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogPartitionManager.readPartitionMapping())
-    coordinatorStreamManager.stop()
-    jobModelManager
+    val coordinatorStreamTestUtil: CoordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config, "coordinator")
+    val coordinatorStreamStore: CoordinatorStreamStore = 
coordinatorStreamTestUtil.getCoordinatorStreamStore
+    val namespaceAwareCoordinatorStore: NamespaceAwareCoordinatorStreamStore = 
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE)
+    try {
+      val changelogPartitionManager = new 
ChangelogStreamManager(namespaceAwareCoordinatorStore)
+      val jobModelManager = JobModelManager(config, 
changelogPartitionManager.readPartitionMapping(), coordinatorStreamStore, new 
MetricsRegistryMap())
+      jobModelManager
+    } finally {
+      coordinatorStreamStore.close()
+    }
   }
 
   @Before
   def setUp() {
     // setup the test stream metadata
-    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]());
-    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]());
-    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]());
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(0)), new util.ArrayList[IncomingMessageEnvelope]())
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(1)), new util.ArrayList[IncomingMessageEnvelope]())
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("test", 
"stream1", new Partition(2)), new util.ArrayList[IncomingMessageEnvelope]())
   }
 
   @After
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index a32ea81..3f45e67 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -33,17 +33,22 @@ import 
org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.SamzaException;
+import org.apache.samza.clustermanager.ClusterBasedJobCoordinator;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
 import org.apache.samza.storage.ChangelogStreamManager;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.Util;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.apache.samza.util.CommandLine;
@@ -154,26 +159,29 @@ public class YarnJobValidationTool {
 
   public void validateJmxMetrics() throws Exception {
     MetricsRegistry metricsRegistry = new MetricsRegistryMap();
-    CoordinatorStreamManager coordinatorStreamManager = new 
CoordinatorStreamManager(config, metricsRegistry);
-    coordinatorStreamManager.register(getClass().getSimpleName());
-    coordinatorStreamManager.start();
-    coordinatorStreamManager.bootstrap();
-    ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping(), metricsRegistry);
-    validator.init(config);
-    Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-    for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-      String containerId = entry.getKey();
-      String jmxUrl = entry.getValue();
-      log.info("validate container " + containerId + " metrics with JMX: " + 
jmxUrl);
-      JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-      jmxMetrics.connect();
-      validator.validate(jmxMetrics);
-      jmxMetrics.close();
-      log.info("validate container " + containerId + " successfully");
+    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(config, metricsRegistry);
+    coordinatorStreamStore.init();
+    try{
+      Config configFromCoordinatorStream = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
+      ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamStore);
+      JobModelManager jobModelManager = 
JobModelManager.apply(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
+                                                              
coordinatorStreamStore, metricsRegistry);
+      validator.init(config);
+      Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
+        String containerId = entry.getKey();
+        String jmxUrl = entry.getValue();
+        log.info("validate container " + containerId + " metrics with JMX: " + 
jmxUrl);
+        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+        jmxMetrics.connect();
+        validator.validate(jmxMetrics);
+        jmxMetrics.close();
+        log.info("validate container " + containerId + " successfully");
+      }
+      validator.complete();
+    } finally {
+      coordinatorStreamStore.close();
     }
-    validator.complete();
-    coordinatorStreamManager.stop();
   }
 
   public static void main(String [] args) throws Exception {
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 1ad4522..e1dde51 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -23,11 +23,12 @@ import java.io.{BufferedReader, InputStreamReader}
 import java.net.URL
 
 import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.clustermanager.SamzaApplicationState
+import org.apache.samza.clustermanager.{ClusterBasedJobCoordinator, 
SamzaApplicationState}
 import org.apache.samza.config.{Config, MapConfig}
-import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, 
MockCoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
CoordinatorStreamStoreTestUtil, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 import org.apache.samza.metrics._
 import org.apache.samza.storage.ChangelogStreamManager
 import org.junit.Assert._
@@ -39,14 +40,14 @@ class TestSamzaYarnAppMasterService {
 
   @Test
   def testAppMasterDashboardShouldStart {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
     val config = getDummyConfig
     val jobModelManager = getTestJobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
     val registry = new MetricsRegistryMap()
 
-    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1);
+    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1)
     val service = new SamzaYarnAppMasterService(config, samzaState, state, 
registry, null)
-    val taskName = new TaskName("test")
 
     // start the dashboard
     service.onInit
@@ -56,15 +57,15 @@ class TestSamzaYarnAppMasterService {
 
     // check to see if it's running
     val url = new URL(state.rpcUrl.toString + "am")
-    val is = url.openConnection().getInputStream();
-    val reader = new BufferedReader(new InputStreamReader(is));
-    var line: String = null;
+    val is = url.openConnection().getInputStream()
+    val reader = new BufferedReader(new InputStreamReader(is))
+    var line: String = null
 
     do {
       line = reader.readLine()
     } while (line != null)
 
-    reader.close();
+    reader.close()
   }
 
   /**
@@ -77,7 +78,7 @@ class TestSamzaYarnAppMasterService {
     val config = getDummyConfig
     val jobModelManager = getTestJobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
-    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1);
+    val state = new YarnAppState(-1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), 
"testHost", 1, 1)
     val registry = new MetricsRegistryMap()
 
     val service = new SamzaYarnAppMasterService(config, samzaState, state, 
registry, null)
@@ -101,14 +102,16 @@ class TestSamzaYarnAppMasterService {
   }
 
   private def getTestJobModelManager(config: Config) = {
-    val coordinatorStreamManager = new CoordinatorStreamManager(config, new 
MetricsRegistryMap)
-    coordinatorStreamManager.register("TestJobCoordinator")
-    coordinatorStreamManager.start
-    coordinatorStreamManager.bootstrap
-    val changelogPartitionManager = new 
ChangelogStreamManager(coordinatorStreamManager)
-    val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, 
changelogPartitionManager.readPartitionMapping())
-    coordinatorStreamManager.stop()
-    jobModelManager
+    val coordinatorStreamTestUtil: CoordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config)
+    val coordinatorStreamStore: CoordinatorStreamStore = 
coordinatorStreamTestUtil.getCoordinatorStreamStore
+    val namespaceAwareCoordinatorStore: NamespaceAwareCoordinatorStreamStore = 
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE)
+    try {
+      val changelogPartitionManager = new 
ChangelogStreamManager(namespaceAwareCoordinatorStore)
+      val jobModelManager = JobModelManager(getDummyConfig, 
changelogPartitionManager.readPartitionMapping(), coordinatorStreamStore, new 
MetricsRegistryMap())
+      jobModelManager
+    } finally {
+      coordinatorStreamStore.close()
+    }
   }
 
   private def getDummyConfig: Config = new MapConfig(Map[String, String](
@@ -124,8 +127,4 @@ class TestSamzaYarnAppMasterService {
     "yarn.container.retry.window.ms" -> "1999999999",
     "job.coordinator.system" -> "coordinator",
     "systems.coordinator.samza.factory" -> 
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName).asJava)
-}
-
-
-
-
+}
\ No newline at end of file

Reply via email to