This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b9e4a8  SAMZA-2158: Remove the redunant coordinator stream reads in 
the ApplicationMaster startup sequence. (#987)
5b9e4a8 is described below

commit 5b9e4a8bcd88c97b50d2229b52950c7c18a2235e
Author: shanthoosh <[email protected]>
AuthorDate: Mon Apr 8 14:36:52 2019 -0700

    SAMZA-2158: Remove the redunant coordinator stream reads in the 
ApplicationMaster startup sequence. (#987)
    
    * Create only one coordinator metadata store connection in samza-yarn 
application master.
    
    * Fix check-style errors.
    
    * Minor code cleanup.
    
    * Address review comments.
    
    * More code cleanup.
    
    * Fix the import order.
    
    * Fix compilation errors.
    
    * Close coordinator stream store even on exceptions.
    
    * Switch from using unordered list type to paragraph type to format the 
javadocs.
    
    * Address review comments.
    
    * Remove unnecessary <p> tag from javadocs.
---
 .../apache/samza/container/LocalityManager.java    |  43 ++----
 .../grouper/task/TaskAssignmentManager.java        |  55 +++-----
 .../task/TaskPartitionAssignmentManager.java       |  29 ++---
 .../CoordinatorStreamMetadataStoreFactory.java     |   2 +-
 .../metadatastore/CoordinatorStreamStore.java      | 144 ++++++++++++++++-----
 .../NamespaceAwareCoordinatorStreamStore.java      | 112 ++++++++++++++++
 .../apache/samza/processor/StreamProcessor.java    |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  | 129 ++++++++++--------
 .../apache/samza/startpoint/StartpointManager.java |  50 +++----
 .../java/org/apache/samza/zk/ZkJobCoordinator.java |   4 +-
 .../apache/samza/container/SamzaContainer.scala    |  27 +---
 .../apache/samza/coordinator/JobModelManager.scala |  45 +++++--
 .../samza/container/TestLocalityManager.java       |  49 +++----
 .../grouper/task/TestTaskAssignmentManager.java    |  39 ++----
 .../task/TestTaskPartitionAssignmentManager.java   |  29 ++---
 .../CoordinatorStreamStoreTestUtil.java            |  64 +++++++++
 .../metadatastore/TestCoordinatorStreamStore.java  |  62 ++++-----
 .../TestNamespaceAwareCoordinatorStreamStore.java  |  88 +++++++++++++
 .../TestCoordinatorStreamSystemProducer.java       |   1 +
 .../startpoint/StartpointManagerTestUtil.java}     |  24 ++--
 .../samza/startpoint/TestStartpointManager.java    |  37 +++---
 .../samza/checkpoint/TestOffsetManager.scala       |   6 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java      |   9 +-
 .../processor/TestZkLocalApplicationRunner.java    |   4 +-
 24 files changed, 671 insertions(+), 383 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 24ddc23..05f2e8b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,19 +19,13 @@
 
 package org.apache.samza.container;
 
-import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,32 +39,17 @@ public class LocalityManager {
   private final MetadataStore metadataStore;
 
   /**
-   * Builds the LocalityManager based upon {@link Config} and {@link 
MetricsRegistry}.
-   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before
-   * reading/writing into metadata store.
+   * Builds the LocalityManager based upon the provided {@link MetadataStore} 
that is instantiated.
+   * Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   * and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   * to reuse it across different utility classes. Uses the {@link 
CoordinatorStreamValueSerde} to serialize
+   * messages before reading/writing into metadata store.
    *
-   * @param config the configuration required for setting up metadata store.
-   * @param metricsRegistry the registry for reporting metrics.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write 
the container locality.
    */
-  public LocalityManager(Config config, MetricsRegistry metricsRegistry) {
-    this(config, metricsRegistry, new 
CoordinatorStreamValueSerde(SetContainerHostMapping.TYPE));
-  }
-
-  /**
-   * Builds the LocalityManager based upon {@link Config} and {@link 
MetricsRegistry}.
-   * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs 
before reading/writing
-   * into {@link MetadataStore}.
-   *
-   * Key and value serializer are different for yarn (uses 
CoordinatorStreamMessage) and standalone (native ObjectOutputStream for 
serialization) modes.
-   * @param config the configuration required for setting up metadata store.
-   * @param metricsRegistry the registry for reporting metrics.
-   * @param valueSerde the value serializer.
-   */
-  LocalityManager(Config config, MetricsRegistry metricsRegistry, 
Serde<String> valueSerde) {
-    MetadataStoreFactory metadataStoreFactory = Util.getObj(new 
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
-    this.metadataStore = 
metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, 
metricsRegistry);
-    this.metadataStore.init();
-    this.valueSerde = valueSerde;
+  public LocalityManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetContainerHostMapping.TYPE);
   }
 
   /**
@@ -84,7 +63,9 @@ public class LocalityManager {
     metadataStore.all().forEach((containerId, valueBytes) -> {
         if (valueBytes != null) {
           String locationId = valueSerde.fromBytes(valueBytes);
-          allMappings.put(containerId, 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
+          Map<String, String> values = new HashMap<>();
+          values.put(SetContainerHostMapping.HOST_KEY, locationId);
+          allMappings.put(containerId, values);
         }
       });
     if (LOG.isDebugEnabled()) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0bac04d..16f8a51 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -18,22 +18,17 @@
  */
 package org.apache.samza.container.grouper.task;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +40,6 @@ public class TaskAssignmentManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskAssignmentManager.class);
 
   private final Map<String, String> taskNameToContainerId = new HashMap<>();
-  private final Serde<String> keySerde;
   private final Serde<String> containerIdSerde;
   private final Serde<String> taskModeSerde;
 
@@ -53,42 +47,23 @@ public class TaskAssignmentManager {
   private MetadataStore taskModeMappingMetadataStore;
 
   /**
-   * Builds the TaskAssignmentManager based upon {@link Config} and {@link 
MetricsRegistry}.
-   * Uses {@link CoordinatorStreamValueSerde} to serialize messages before 
reading/writing
-   * into the metadata store.
+   * Builds the TaskAssignmentManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   * Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   * and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   * to reuse it across different utility classes. Uses the {@link 
CoordinatorStreamValueSerde} to serialize
+   * messages before reading/writing into metadata store.
    *
-   * @param config the configuration required for setting up metadata store.
-   * @param metricsRegistry the registry for reporting metrics.
+   * @param taskContainerMappingMetadataStore an instance of {@link 
MetadataStore} used to read/write the task to container assignments.
+   * @param taskModeMappingMetadataStore an instance of {@link MetadataStore} 
used to read/write the task to mode  assignments.
    */
-  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry) 
{
-    this(config, metricsRegistry, new 
CoordinatorStreamKeySerde(SetTaskContainerMapping.TYPE),
-         new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE), new 
CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE));
-  }
-
-  /**
-   * Builds the LocalityManager based upon {@link Config} and {@link 
MetricsRegistry}.
-   *
-   * Uses keySerde, containerIdSerde to serialize/deserialize (key, value) 
pairs before reading/writing
-   * into {@link MetadataStore}.
-   *
-   * Key and value serializer are different for yarn(uses 
CoordinatorStreamMessage) and standalone(uses native
-   * ObjectOutputStream for serialization) modes.
-   * @param config the configuration required for setting up metadata store.
-   * @param metricsRegistry the registry for reporting metrics.
-   * @param keySerde the key serializer.
-   * @param containerIdSerde the value serializer.
-   * @param taskModeSerde the task-mode serializer.
-   */
-  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, 
Serde<String> keySerde, Serde<String> containerIdSerde, Serde<String> 
taskModeSerde) {
-    this.keySerde = keySerde;
-    this.containerIdSerde = containerIdSerde;
-    this.taskModeSerde = taskModeSerde;
+  public TaskAssignmentManager(MetadataStore 
taskContainerMappingMetadataStore, MetadataStore taskModeMappingMetadataStore) {
+    Preconditions.checkNotNull(taskContainerMappingMetadataStore, "Metadata 
store cannot be null");
+    Preconditions.checkNotNull(taskModeMappingMetadataStore, "Metadata store 
cannot be null");
 
-    MetadataStoreFactory metadataStoreFactory = Util.getObj(new 
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
-    this.taskModeMappingMetadataStore = 
metadataStoreFactory.getMetadataStore(SetTaskModeMapping.TYPE, config, 
metricsRegistry);
-    this.taskContainerMappingMetadataStore = 
metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, 
metricsRegistry);
-    this.taskModeMappingMetadataStore.init();
-    this.taskContainerMappingMetadataStore.init();
+    this.taskModeMappingMetadataStore = taskModeMappingMetadataStore;
+    this.taskContainerMappingMetadataStore = taskContainerMappingMetadataStore;
+    this.containerIdSerde = new 
CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE);
+    this.taskModeSerde = new 
CoordinatorStreamValueSerde(SetTaskModeMapping.TYPE);
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
index c40d591..7e32f0a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
@@ -18,18 +18,14 @@
  */
 package org.apache.samza.container.grouper.task;
 
+import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
@@ -54,20 +50,19 @@ public class TaskPartitionAssignmentManager {
   private final MetadataStore metadataStore;
 
   /**
-   * Instantiates the task partition assignment manager with the provided 
metricsRegistry
-   * and config.
-   * @param config the configuration required for connecting with the metadata 
store.
-   * @param metricsRegistry the registry to create and report custom metrics.
+   * Builds the TaskPartitionAssignmentManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   * Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   * and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   * to reuse it across different utility classes. Uses the {@link 
CoordinatorStreamValueSerde} to serialize
+   * messages before reading/writing into metadata store.
+   *
+   * @param metadataStore an instance of {@link MetadataStore} used to 
read/write the task to partition assignments.
    */
-  public TaskPartitionAssignmentManager(Config config, MetricsRegistry 
metricsRegistry) {
-    this(config, metricsRegistry, new 
CoordinatorStreamValueSerde(SetTaskPartitionMapping.TYPE));
-  }
+  public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
+    Preconditions.checkNotNull(metadataStore, "Metdatastore cannot be null.");
 
-  TaskPartitionAssignmentManager(Config config, MetricsRegistry 
metricsRegistry, Serde<String> valueSerde) {
-    this.valueSerde = valueSerde;
-    MetadataStoreFactory metadataStoreFactory = Util.getObj(new 
JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
-    this.metadataStore = 
metadataStoreFactory.getMetadataStore(SetTaskPartitionMapping.TYPE, config, 
metricsRegistry);
-    this.metadataStore.init();
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetTaskPartitionMapping.TYPE);
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
index c38dc56..65df26e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
@@ -31,6 +31,6 @@ public class CoordinatorStreamMetadataStoreFactory implements 
MetadataStoreFacto
 
   @Override
   public MetadataStore getMetadataStore(String namespace, Config config, 
MetricsRegistry metricsRegistry) {
-    return new CoordinatorStreamStore(namespace, config, metricsRegistry);
+    return new CoordinatorStreamStore(config, metricsRegistry);
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 899c87c..87121f1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -18,15 +18,18 @@
  */
 package org.apache.samza.coordinator.metadatastore;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Objects;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
@@ -46,18 +49,24 @@ import org.apache.samza.system.SystemStreamMetadata;
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.util.CoordinatorStreamUtil;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An implementation of the {@link MetadataStore} interface where the metadata 
of the Samza job is stored in coordinator stream.
+ * An implementation of the {@link MetadataStore} interface where the metadata 
of the samza job is stored in coordinator stream.
  *
  * This class is thread safe.
+ *
+ * It is recommended to use {@link NamespaceAwareCoordinatorStreamStore}. This 
will enable the single CoordinatorStreamStore connection
+ * to be shared by the multiple {@link NamespaceAwareCoordinatorStreamStore} 
instances.
  */
 public class CoordinatorStreamStore implements MetadataStore {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorStreamStore.class);
   private static final String SOURCE = "SamzaContainer";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private final Config config;
   private final SystemStream coordinatorSystemStream;
@@ -65,19 +74,17 @@ public class CoordinatorStreamStore implements 
MetadataStore {
   private final SystemProducer systemProducer;
   private final SystemConsumer systemConsumer;
   private final SystemAdmin systemAdmin;
-  private final String type;
-  private final CoordinatorStreamKeySerde keySerde;
 
-  private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+  // Namespaced key to the message byte array.
+  private final Map<String, byte[]> messagesReadFromCoordinatorStream = new 
ConcurrentHashMap<>();
+
   private final Object bootstrapLock = new Object();
   private final AtomicBoolean isInitialized = new AtomicBoolean(false);
 
   private SystemStreamPartitionIterator iterator;
 
-  public CoordinatorStreamStore(String namespace, Config config, 
MetricsRegistry metricsRegistry) {
+  public CoordinatorStreamStore(Config config, MetricsRegistry 
metricsRegistry) {
     this.config = config;
-    this.type = namespace;
-    this.keySerde = new CoordinatorStreamKeySerde(type);
     this.coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
     this.coordinatorSystemStreamPartition = new 
SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     SystemFactory systemFactory = 
CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
@@ -86,6 +93,16 @@ public class CoordinatorStreamStore implements MetadataStore 
{
     this.systemAdmin = 
systemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config);
   }
 
+  @VisibleForTesting
+  CoordinatorStreamStore(Config config, SystemProducer systemProducer, 
SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+    this.config = config;
+    this.systemConsumer = systemConsumer;
+    this.systemProducer = systemProducer;
+    this.systemAdmin = systemAdmin;
+    this.coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    this.coordinatorSystemStreamPartition = new 
SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+  }
+
   @Override
   public void init() {
     if (isInitialized.compareAndSet(false, true)) {
@@ -95,41 +112,44 @@ public class CoordinatorStreamStore implements 
MetadataStore {
       systemProducer.register(SOURCE);
       systemProducer.start();
       iterator = new SystemStreamPartitionIterator(systemConsumer, 
coordinatorSystemStreamPartition);
-      bootstrapMessagesFromStream();
+      readMessagesFromCoordinatorStream();
     } else {
       LOG.info("Store had already been initialized. Skipping.", 
coordinatorSystemStreamPartition);
     }
   }
 
   @Override
-  public byte[] get(String key) {
-    bootstrapMessagesFromStream();
-    return bootstrappedMessages.get(key);
+  public byte[] get(String namespacedKey) {
+    readMessagesFromCoordinatorStream();
+    return messagesReadFromCoordinatorStream.get(namespacedKey);
   }
 
   @Override
-  public void put(String key, byte[] value) {
-    OutgoingMessageEnvelope envelope = new 
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keySerde.toBytes(key), 
value);
+  public void put(String namespacedKey, byte[] value) {
+    // 1. Store the namespace and key into correct fields of the 
CoordinatorStreamKey and convert the key to bytes.
+    CoordinatorMessageKey coordinatorMessageKey = 
deserializeCoordinatorMessageKeyFromJson(namespacedKey);
+    CoordinatorStreamKeySerde keySerde = new 
CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
+    byte[] keyBytes = keySerde.toBytes(coordinatorMessageKey.getKey());
+
+    // 2. Set the key, message in correct fields of {@link 
OutgoingMessageEnvelope} and publish it to the coordinator stream.
+    OutgoingMessageEnvelope envelope = new 
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keyBytes, value);
     systemProducer.send(SOURCE, envelope);
     flush();
   }
 
   @Override
-  public void delete(String key) {
-    // Since kafka doesn't support individual message deletion, store value as 
null for a key to delete.
-    put(key, null);
+  public void delete(String namespacedKey) {
+    // Since kafka doesn't support individual message deletion, store value as 
null for a namespacedKey to delete.
+    put(namespacedKey, null);
   }
 
   @Override
   public Map<String, byte[]> all() {
-    bootstrapMessagesFromStream();
-    return Collections.unmodifiableMap(bootstrappedMessages);
+    readMessagesFromCoordinatorStream();
+    return Collections.unmodifiableMap(messagesReadFromCoordinatorStream);
   }
 
-  /**
-   * Returns all the messages from the earliest offset all the way to the 
latest.
-   */
-  private void bootstrapMessagesFromStream() {
+  private void readMessagesFromCoordinatorStream() {
     synchronized (bootstrapLock) {
       while (iterator.hasNext()) {
         IncomingMessageEnvelope envelope = iterator.next();
@@ -137,12 +157,11 @@ public class CoordinatorStreamStore implements 
MetadataStore {
         Serde<List<?>> serde = new JsonSerde<>();
         Object[] keyArray = serde.fromBytes(keyAsBytes).toArray();
         CoordinatorStreamMessage coordinatorStreamMessage = new 
CoordinatorStreamMessage(keyArray, new HashMap<>());
-        if (Objects.equals(coordinatorStreamMessage.getType(), type)) {
-          if (envelope.getMessage() != null) {
-            bootstrappedMessages.put(coordinatorStreamMessage.getKey(), 
(byte[]) envelope.getMessage());
-          } else {
-            bootstrappedMessages.remove(coordinatorStreamMessage.getKey());
-          }
+        String namespacedKey = 
serializeCoordinatorMessageKeyToJson(coordinatorStreamMessage.getType(), 
coordinatorStreamMessage.getKey());
+        if (envelope.getMessage() != null) {
+          messagesReadFromCoordinatorStream.put(namespacedKey, (byte[]) 
envelope.getMessage());
+        } else {
+          messagesReadFromCoordinatorStream.remove(namespacedKey);
         }
       }
     }
@@ -169,6 +188,12 @@ public class CoordinatorStreamStore implements 
MetadataStore {
     }
   }
 
+  /**
+   * <p>
+   *   Fetches the metadata of the topic partition of coordinator stream. 
Registers the oldest offset
+   *   for the topic partition of coordinator stream with the coordinator 
system consumer.
+   * </p>
+   */
   private void registerConsumer() {
     LOG.debug("Attempting to register system stream partition: {}", 
coordinatorSystemStreamPartition);
     String streamName = coordinatorSystemStreamPartition.getStream();
@@ -184,4 +209,65 @@ public class CoordinatorStreamStore implements 
MetadataStore {
     LOG.info("Registering system stream partition: {} with offset: {}.", 
coordinatorSystemStreamPartition, startingOffset);
     systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
   }
+
+  /**
+   *
+   * Serializes the {@link CoordinatorMessageKey} into a json string.
+   *
+   * @param type the type of the coordinator message.
+   * @param key the key associated with the type
+   * @return the CoordinatorMessageKey serialized to a json string.
+   */
+  public static String serializeCoordinatorMessageKeyToJson(String type, 
String key) {
+    try {
+      CoordinatorMessageKey coordinatorMessageKey = new 
CoordinatorMessageKey(key, type);
+      return OBJECT_MAPPER.writeValueAsString(coordinatorMessageKey);
+    } catch (IOException e) {
+      throw new SamzaException(String.format("Exception occurred when 
serializing metadata for type: %s, key: %s", type, key), e);
+    }
+  }
+
+  /**
+   * Deserializes the @param coordinatorMsgKeyAsString in json format to 
{@link CoordinatorMessageKey}.
+   * @param coordinatorMsgKeyAsJson the serialized CoordinatorMessageKey in 
json format.
+   * @return the deserialized CoordinatorMessageKey.
+   */
+  public static CoordinatorMessageKey 
deserializeCoordinatorMessageKeyFromJson(String coordinatorMsgKeyAsJson) {
+    try {
+      return OBJECT_MAPPER.readValue(coordinatorMsgKeyAsJson, 
CoordinatorMessageKey.class);
+    } catch (IOException e) {
+      throw new SamzaException(String.format("Exception occurred when 
deserializing the coordinatorMsgKey: %s", coordinatorMsgKeyAsJson), e);
+    }
+  }
+
+  /**
+   * <p>
+   * Represents the key of a message in the coordinator stream.
+   *
+   * Coordinator message key is composite. It has both the type of the message
+   * and the key associated with the type in it.
+   * </p>
+   */
+  public static class CoordinatorMessageKey {
+
+    // Represents the key associated with the type
+    private final String key;
+
+    // Represents the type of the message.
+    private final String namespace;
+
+    CoordinatorMessageKey(@JsonProperty("key") String key,
+                          @JsonProperty("namespace") String namespace) {
+      this.key = key;
+      this.namespace = namespace;
+    }
+
+    public String getKey() {
+      return this.key;
+    }
+
+    public String getNamespace() {
+      return this.namespace;
+    }
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
new file mode 100644
index 0000000..8a28fae
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.CoordinatorMessageKey;
+import org.apache.samza.metadatastore.MetadataStore;
+
+/**
+ * Provides a namespace aware read/write operations on top of {@link 
CoordinatorStreamStore}.
+ */
+public class NamespaceAwareCoordinatorStreamStore implements MetadataStore {
+
+  private final MetadataStore metadataStore;
+  private final String namespace;
+
+  /**
+   * Instantiates the {@link NamespaceAwareCoordinatorStreamStore} based upon 
the provided
+   * MetadataStore that is instantiated and the namespace.
+   *
+   * @param metadataStore the instantiated {@link MetadataStore}.
+   * @param namespace the namespace to use for storing the keys in the 
metadata store.
+   */
+  public NamespaceAwareCoordinatorStreamStore(MetadataStore metadataStore, 
String namespace) {
+    this.metadataStore = metadataStore;
+    this.namespace = namespace;
+  }
+
+  @Override
+  public void init() {
+    // Metadata store lifecycle is managed outside of this class, so not 
starting it.
+  }
+
+  @Override
+  public byte[] get(String key) {
+    Map<String, byte[]> bootstrappedMessages = 
readMessagesFromCoordinatorStore();
+    return bootstrappedMessages.get(key);
+  }
+
+  @Override
+  public void put(String key, byte[] value) {
+    String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
+    metadataStore.put(coordinatorMessageKeyAsJson, value);
+  }
+
+  @Override
+  public void delete(String key) {
+    String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
+    metadataStore.delete(coordinatorMessageKeyAsJson);
+  }
+
+  @Override
+  public Map<String, byte[]> all() {
+    Map<String, byte[]> bootstrappedMessages = 
readMessagesFromCoordinatorStore();
+    return Collections.unmodifiableMap(bootstrappedMessages);
+  }
+
+  @Override
+  public void flush() {
+    metadataStore.flush();
+  }
+
+  @Override
+  public void close() {
+    // Metadata store lifecycle is managed outside of this class, so not 
stopping it.
+  }
+
+  @VisibleForTesting
+  String getCoordinatorMessageKey(String key) {
+    return 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, key);
+  }
+
+  /**
+   * Reads and returns all the messages from coordinator stream for a 
particular namespace.
+   */
+  private Map<String, byte[]> readMessagesFromCoordinatorStore() {
+    Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+    Map<String, byte[]> coordinatorStreamMessages = metadataStore.all();
+    coordinatorStreamMessages.forEach((coordinatorMessageKeyAsJson, value) -> {
+        CoordinatorMessageKey coordinatorMessageKey = 
CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(coordinatorMessageKeyAsJson);
+        if (Objects.equals(namespace, coordinatorMessageKey.getNamespace())) {
+          if (value != null) {
+            bootstrappedMessages.put(coordinatorMessageKey.getKey(), value);
+          } else {
+            bootstrappedMessages.remove(coordinatorMessageKey.getKey());
+          }
+        }
+      });
+
+    return bootstrappedMessages;
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2c0c0b7..2a8c862 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -334,7 +334,7 @@ public class StreamProcessor {
         this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
         
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        Option.apply(this.externalContextOptional.orElse(null)), null);
+        Option.apply(this.externalContextOptional.orElse(null)), null, null);
   }
 
   private JobCoordinator createJobCoordinator() {
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index b483997..564dbf3 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -23,6 +23,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
@@ -32,9 +33,13 @@ import org.apache.samza.container.SamzaContainer$;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.context.JobContextImpl;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.startpoint.StartpointManager;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaJavaUtil;
@@ -74,62 +79,74 @@ public class ContainerLaunchUtil {
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
-    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
-    LocalityManager localityManager = new LocalityManager(config, new 
MetricsRegistryMap());
-    SamzaContainer container = SamzaContainer$.MODULE$.apply(
-        containerId,
-        jobModel,
-        ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, 
config)),
-        taskFactory,
-        JobContextImpl.fromConfigWithDefaults(config),
-        
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
-        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
-        Option.apply(externalContextOptional.orElse(null)), localityManager);
-
-    ProcessorLifecycleListener listener = 
appDesc.getProcessorLifecycleListenerFactory()
-        .createInstance(new ProcessorContext() { }, config);
-
-    container.setContainerListener(
-        new SamzaContainerListener() {
-          @Override
-          public void beforeStart() {
-            log.info("Before starting the container.");
-            listener.beforeStart();
-          }
-
-          @Override
-          public void afterStart() {
-            log.info("Container Started");
-            listener.afterStart();
-          }
-
-          @Override
-          public void afterStop() {
-            log.info("Container Stopped");
-            listener.afterStop();
-          }
-
-          @Override
-          public void afterFailure(Throwable t) {
-            log.info("Container Failed");
-            containerRunnerException = t;
-            listener.afterFailure(t);
-          }
-        });
-
-    ContainerHeartbeatMonitor heartbeatMonitor = 
createContainerHeartbeatMonitor(container);
-    if (heartbeatMonitor != null) {
-      heartbeatMonitor.start();
-    }
-
-    container.run();
-    if (heartbeatMonitor != null) {
-      heartbeatMonitor.stop();
-    }
-
-    if (containerRunnerException != null) {
-      log.error("Container stopped with Exception. Exiting process now.", 
containerRunnerException);
-      System.exit(1);
+    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap());
+    coordinatorStreamStore.init();
+
+    try {
+      TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+      LocalityManager localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE));
+      Optional<StartpointManager> startpointManager = Optional.empty();
+      if (new JobConfig(config).getStartpointMetadataStoreFactory() != null) {
+        startpointManager = Optional.of(new StartpointManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
StartpointManager.NAMESPACE)));
+      }
+
+      SamzaContainer container = SamzaContainer$.MODULE$.apply(
+          containerId,
+          jobModel,
+          ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, 
config)),
+          taskFactory,
+          JobContextImpl.fromConfigWithDefaults(config),
+          
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+          
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+          Option.apply(externalContextOptional.orElse(null)), localityManager, 
startpointManager.orElse(null));
+
+      ProcessorLifecycleListener listener = 
appDesc.getProcessorLifecycleListenerFactory()
+          .createInstance(new ProcessorContext() { }, config);
+
+      container.setContainerListener(
+          new SamzaContainerListener() {
+            @Override
+            public void beforeStart() {
+              log.info("Before starting the container.");
+              listener.beforeStart();
+            }
+
+            @Override
+            public void afterStart() {
+              log.info("Container Started");
+              listener.afterStart();
+            }
+
+            @Override
+            public void afterStop() {
+              log.info("Container Stopped");
+              listener.afterStop();
+            }
+
+            @Override
+            public void afterFailure(Throwable t) {
+              log.info("Container Failed");
+              containerRunnerException = t;
+              listener.afterFailure(t);
+            }
+          });
+
+      ContainerHeartbeatMonitor heartbeatMonitor = 
createContainerHeartbeatMonitor(container);
+      if (heartbeatMonitor != null) {
+        heartbeatMonitor.start();
+      }
+
+      container.run();
+      if (heartbeatMonitor != null) {
+        heartbeatMonitor.stop();
+      }
+
+      if (containerRunnerException != null) {
+        log.error("Container stopped with Exception. Exiting process now.", 
containerRunnerException);
+        System.exit(1);
+      }
+    } finally {
+      coordinatorStreamStore.close();
     }
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java 
b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index 6005c92..da7f990 100644
--- 
a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -54,40 +55,45 @@ import org.slf4j.LoggerFactory;
  */
 public class StartpointManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  private static final String NAMESPACE = "samza-startpoint-v1";
+  public static final String NAMESPACE = "samza-startpoint-v1";
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
   private final MetadataStore metadataStore;
   private final StartpointSerde startpointSerde = new StartpointSerde();
 
-  private boolean started = false;
+  private boolean stopped = false;
 
   /**
-   * Constructs a {@link StartpointManager} instance with the provided {@link 
MetadataStoreFactory}
-   * @param metadataStoreFactory {@link MetadataStoreFactory} used to 
construct the underlying store.
-   * @param config {@link Config} required for the underlying store.
-   * @param metricsRegistry {@link MetricsRegistry} to hook into the 
underlying store.
+   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
+   *  This is primarily used for testing.
    */
-  public StartpointManager(MetadataStoreFactory metadataStoreFactory, Config 
config, MetricsRegistry metricsRegistry) {
+  @VisibleForTesting
+  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
     Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
     Preconditions.checkNotNull(config, "Config cannot be null");
     Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
 
     this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
     LOG.info("StartpointManager created with metadata store: {}", 
metadataStore.getClass().getCanonicalName());
+    this.metadataStore.init();
   }
 
   /**
-   * Starts the underlying {@link MetadataStore}
+   *  Builds the StartpointManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   *  Setting up a metadata store instance is expensive which requires opening 
multiple connections
+   *  and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   *  to reuse it across different utility classes.
+   *
+   * @param metadataStore an instance of {@link MetadataStore} used to 
read/write the start-points.
    */
+  public StartpointManager(MetadataStore metadataStore) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+    this.metadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
+  }
+
   public void start() {
-    if (!started) {
-      metadataStore.init();
-      started = true;
-    } else {
-      LOG.warn("StartpointManager already started");
-    }
+    // Metadata store lifecycle is managed outside of the StartpointManager, 
so not starting it.
   }
 
   /**
@@ -106,7 +112,7 @@ public class StartpointManager {
    * @param startpoint Reference to a Startpoint object.
    */
   public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, 
Startpoint startpoint) {
-    Preconditions.checkState(started, "Underlying metadata store not 
available");
+    Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
     Preconditions.checkNotNull(startpoint, "Startpoint cannot be null");
 
@@ -134,7 +140,7 @@ public class StartpointManager {
    * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null 
if it does not exist or if it is too stale.
    */
   public Startpoint readStartpoint(SystemStreamPartition ssp, TaskName 
taskName) {
-    Preconditions.checkState(started, "Underlying metadata store not 
available");
+    Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
     byte[] startpointBytes = metadataStore.get(toStoreKey(ssp, taskName));
@@ -164,7 +170,7 @@ public class StartpointManager {
    * @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} 
for.
    */
   public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {
-    Preconditions.checkState(started, "Underlying metadata store not 
available");
+    Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
 
     metadataStore.delete(toStoreKey(ssp, taskName));
@@ -179,7 +185,7 @@ public class StartpointManager {
    * @return The list of {@link SystemStreamPartition}s that were fanned out 
to SystemStreamPartition+TaskName.
    */
   public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel 
jobModel) {
-    Preconditions.checkState(started, "Underlying metadata store not 
available");
+    Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
 
     HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>();
@@ -222,12 +228,8 @@ public class StartpointManager {
    * Relinquish resources held by the underlying {@link MetadataStore}
    */
   public void stop() {
-    if (started) {
-      metadataStore.close();
-      started = false;
-    } else {
-      LOG.warn("StartpointManager already stopped.");
-    }
+    stopped = true;
+    // Metadata store lifecycle is managed outside of the StartpointManager, 
so not closing it.
   }
 
   @VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9bfc3f8..a0d0e4e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -44,6 +44,7 @@ import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.job.model.ContainerModel;
@@ -307,7 +308,8 @@ public class ZkJobCoordinator implements JobCoordinator {
       CoordinatorStreamValueSerde jsonSerde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
       for (Map.Entry<String, String> entry : config.entrySet()) {
         byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
-        metadataStore.put(entry.getKey(), serializedValue);
+        String configKey = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, 
entry.getKey());
+        metadataStore.put(configKey, serializedValue);
       }
     } finally {
       if (metadataStore != null) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d2b8f8f..307ee35 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -135,7 +135,8 @@ object SamzaContainer extends Logging {
     applicationContainerContextFactoryOption: 
Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
     applicationTaskContextFactoryOption: 
Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
     externalContextOption: Option[ExternalContext],
-    localityManager: LocalityManager = null) = {
+    localityManager: LocalityManager = null,
+    startpointManager: StartpointManager = null) = {
     val config = jobContext.getConfig
     val systemConfig = new SystemConfig(config)
     val containerModel = jobModel.getContainers.get(containerId)
@@ -428,28 +429,12 @@ object SamzaContainer extends Logging {
       .orNull
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val startpointMetadataStoreFactory = 
Option(config.getStartpointMetadataStoreFactory)
-      .map(Util.getObj(_, classOf[MetadataStoreFactory]))
-      .orNull
-    val startpointManager = if (startpointMetadataStoreFactory != null) {
-      try {
-        Option(new StartpointManager(startpointMetadataStoreFactory, config, 
samzaContainerMetrics.registry))
-      } catch {
-        case e: Exception => {
-          error("Unable to get an instance of the StartpointManager. 
Continuing without one.", e)
-          None
-        }
-      }
-    } else {
-      None
-    }
-
     // create a map of consumers with callbacks to pass to the OffsetManager
     val checkpointListeners = 
consumers.filter(_._2.isInstanceOf[CheckpointListener])
       .map { case (system, consumer) => (system, 
consumer.asInstanceOf[CheckpointListener])}
     info("Got checkpointListeners : %s" format checkpointListeners)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, startpointManager.getOrElse(null), systemAdmins, 
checkpointListeners, offsetManagerMetrics)
+    val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, startpointManager, systemAdmins, checkpointListeners, 
offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
     val dropDeserializationError = config.getDropDeserializationErrors
@@ -938,11 +923,11 @@ class SamzaContainer(
         localityManager.writeContainerToHostMapping(containerId, 
hostInet.getHostName)
       } catch {
         case uhe: UnknownHostException =>
-          warn("Received UnknownHostException when persisting locality info 
for container %s: " +
-            "%s" format (containerId, uhe.getMessage))  //No-op
+          warn("Received UnknownHostException when persisting locality info 
for container: " +
+            "%s" format (containerId), uhe)  //No-op
         case unknownException: Throwable =>
           warn("Received an exception when persisting locality info for 
container %s: " +
-            "%s" format (containerId, unknownException.getMessage))
+            "%s" format (containerId), unknownException)
       } finally {
         info("Shutting down locality manager.")
         localityManager.close()
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index fa9d2a7..230a62d 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -27,16 +27,31 @@ import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.Config
-import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, 
SystemStreamPartitionGrouperFactory}
+import org.apache.samza.container.grouper.stream.SSPGrouperProxy
+import 
org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task._
-import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.container.LocalityManager
+import org.apache.samza.container.TaskName
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
+import org.apache.samza.coordinator.server.HttpServer
+import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode, 
TaskModel}
-import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskMode
+import org.apache.samza.job.model.TaskModel
+import org.apache.samza.metadatastore.MetadataStore
+import org.apache.samza.metadatastore.MetadataStoreFactory
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.LocationId
 import org.apache.samza.system._
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -47,6 +62,8 @@ import scala.collection.JavaConverters._
  */
 object JobModelManager extends Logging {
 
+  val SOURCE = "JobModelManager"
+
   /**
    * a volatile value to store the current instantiated 
<code>JobModelManager</code>
    */
@@ -65,9 +82,15 @@ object JobModelManager extends Logging {
    * @return the instantiated {@see JobModelManager}.
    */
   def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, 
Integer], metricsRegistry: MetricsRegistry = new MetricsRegistryMap()): 
JobModelManager = {
-    val localityManager = new LocalityManager(config, metricsRegistry)
-    val taskAssignmentManager = new TaskAssignmentManager(config, 
metricsRegistry)
-    val taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(config, metricsRegistry)
+    val coordinatorStreamStoreFactory: MetadataStoreFactory = new 
CoordinatorStreamMetadataStoreFactory()
+    val coordinatorStreamStore: MetadataStore = 
coordinatorStreamStoreFactory.getMetadataStore(SOURCE, config, metricsRegistry)
+    coordinatorStreamStore.init()
+
+    // Instantiate the respective metadata store util classes which uses the 
same coordinator metadata store.
+    val localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE))
+    val taskAssignmentManager = new TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskContainerMapping.TYPE), new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskModeMapping.TYPE))
+    val taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskPartitionMapping.TYPE))
+
     val systemAdmins = new SystemAdmins(config)
     try {
       systemAdmins.start()
@@ -85,10 +108,8 @@ object JobModelManager extends Logging {
       currentJobModelManager = new JobModelManager(jobModelRef.get(), server, 
localityManager)
       currentJobModelManager
     } finally {
-      taskPartitionAssignmentManager.close()
-      taskAssignmentManager.close()
       systemAdmins.stop()
-      // Not closing localityManager, since {@code ClusterBasedJobCoordinator} 
uses it to read container locality through {@code JobModel}.
+      // Not closing coordinatorStreamStore, since {@code 
ClusterBasedJobCoordinator} uses it to read container locality through {@code 
JobModel}.
     }
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java 
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index 387491f..d18ad67 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -24,42 +24,29 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import 
org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
 import 
org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.runner.RunWith;
 import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
 
-/**
- * Unit tests for {@link LocalityManager}
- */
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestLocalityManager {
 
-  private MockCoordinatorStreamSystemFactory 
mockCoordinatorStreamSystemFactory;
-  private final Config config = new MapConfig(ImmutableMap.of("job.name", 
"test-job", "job.coordinator.system", "test-kafka"));
+  private static final Config CONFIG = new 
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", 
"test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
 
   @Before
   public void setup() {
-    mockCoordinatorStreamSystemFactory = new 
MockCoordinatorStreamSystemFactory();
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
-    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
 SystemStream("test-kafka", "test"));
-    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), 
anyObject())).thenReturn("test");
+    coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
   }
 
   @After
@@ -67,8 +54,9 @@ public class TestLocalityManager {
     MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
   }
 
-  @Test public void testLocalityManager() {
-    LocalityManager localityManager = new LocalityManager(config, new 
MetricsRegistryMap());
+  @Test
+  public void testLocalityManager() {
+    LocalityManager localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE));
 
     localityManager.writeContainerToHostMapping("0", "localhost");
     Map<String, Map<String, String>> localMap = 
localityManager.readContainerLocality();
@@ -87,14 +75,15 @@ public class TestLocalityManager {
 
     localityManager.close();
 
-    MockCoordinatorStreamSystemProducer producer = 
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, 
null);
-    MockCoordinatorStreamSystemConsumer consumer = 
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, 
null);
+    MockCoordinatorStreamSystemProducer producer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemConsumer consumer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
 
-  @Test public void testWriteOnlyLocalityManager() {
-    LocalityManager localityManager = new LocalityManager(config, new 
MetricsRegistryMap());
+  @Test
+  public void testWriteOnlyLocalityManager() {
+    LocalityManager localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE));
 
     localityManager.writeContainerToHostMapping("1", "localhost");
 
@@ -104,8 +93,8 @@ public class TestLocalityManager {
 
     localityManager.close();
 
-    MockCoordinatorStreamSystemProducer producer = 
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, 
null);
-    MockCoordinatorStreamSystemConsumer consumer = 
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, 
null);
+    MockCoordinatorStreamSystemProducer producer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemConsumer consumer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 443262e..357e8ae 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -19,46 +19,37 @@
 
 package org.apache.samza.container.grouper.task;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.system.*;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestTaskAssignmentManager {
 
-  private MockCoordinatorStreamSystemFactory 
mockCoordinatorStreamSystemFactory;
+  private static final Config CONFIG = new 
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", 
"test-kafka"));
 
-  private final Config config = new MapConfig(ImmutableMap.of("job.name", 
"test-job", "job.coordinator.system", "test-kafka"));
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private TaskAssignmentManager taskAssignmentManager;
 
   @Before
   public void setup() {
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
-    mockCoordinatorStreamSystemFactory = new 
MockCoordinatorStreamSystemFactory();
-    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
 SystemStream("test-kafka", "test"));
-    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), 
anyObject())).thenReturn("test");
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    taskAssignmentManager = new TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskContainerMapping.TYPE), new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskModeMapping.TYPE));
   }
 
   @After
@@ -68,8 +59,6 @@ public class TestTaskAssignmentManager {
 
   @Test
   public void testTaskAssignmentManager() {
-    TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(config, new MetricsRegistryMap());
-
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", 
"1", "Task2", "2", "Task3", "0", "Task4", "1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
@@ -85,8 +74,6 @@ public class TestTaskAssignmentManager {
 
   @Test
   public void testDeleteMappings() {
-    TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(config, new MetricsRegistryMap());
-
     Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", 
"1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
@@ -106,8 +93,6 @@ public class TestTaskAssignmentManager {
 
   @Test
   public void testTaskAssignmentManagerEmptyCoordinatorStream() {
-    TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(config, new MetricsRegistryMap());
-
     Map<String, String> expectedMap = new HashMap<>();
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
index a141ee3..1a7f0c9 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java
@@ -24,47 +24,34 @@ import java.util.List;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.Partition;
-import org.apache.samza.system.SystemStream;
+import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestTaskPartitionAssignmentManager {
 
   private static final String TEST_SYSTEM = "system";
   private static final String TEST_STREAM = "stream";
   private static final Partition PARTITION = new Partition(0);
+  private static final Config CONFIG = new 
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", 
"test-kafka"));
 
-  private final Config config = new MapConfig(ImmutableMap.of("job.name", 
"test-job", "job.coordinator.system", "test-kafka"));
   private final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, PARTITION);
 
-  private MockCoordinatorStreamSystemFactory 
mockCoordinatorStreamSystemFactory;
   private TaskPartitionAssignmentManager taskPartitionAssignmentManager;
 
   @Before
   public void setup() {
-    mockCoordinatorStreamSystemFactory = new 
MockCoordinatorStreamSystemFactory();
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
-    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
 SystemStream("test-kafka", "test"));
-    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), 
anyObject())).thenReturn("test");
-    taskPartitionAssignmentManager = new 
TaskPartitionAssignmentManager(config, new MetricsRegistryMap());
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskPartitionMapping.TYPE));
   }
 
   @After
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
new file mode 100644
index 0000000..9ec548d
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStoreTestUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.metadatastore;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+/**
+ * <p>
+ * Instantiates and initializes a {@link CoordinatorStreamStore} based upon 
the provided configuration.
+ *
+ * Primarily used for testing the metadata read/write flows in 
ApplicationMaster.
+ * </p>
+ */
+public class CoordinatorStreamStoreTestUtil {
+
+  private final CoordinatorStreamStore coordinatorStreamStore;
+  private final MockCoordinatorStreamSystemFactory systemFactory;
+  private final Config config;
+
+  public CoordinatorStreamStoreTestUtil(Config config) {
+    this.config = config;
+    this.systemFactory = new MockCoordinatorStreamSystemFactory();
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+    SystemConsumer systemConsumer = systemFactory.getConsumer("test-kafka", 
config, new NoOpMetricsRegistry());
+    SystemProducer systemProducer = systemFactory.getProducer("test-kafka", 
config, new NoOpMetricsRegistry());
+    SystemAdmin systemAdmin = systemFactory.getAdmin("test-kafka", config);
+    this.coordinatorStreamStore = new CoordinatorStreamStore(config, 
systemProducer, systemConsumer, systemAdmin);
+    this.coordinatorStreamStore.init();
+  }
+
+  public CoordinatorStreamStore getCoordinatorStreamStore() {
+    return coordinatorStreamStore;
+  }
+
+  public 
MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer 
getMockCoordinatorStreamSystemProducer() {
+    return systemFactory.getCoordinatorStreamSystemProducer(config, null);
+  }
+
+  public 
MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer 
getMockCoordinatorStreamSystemConsumer() {
+    return systemFactory.getCoordinatorStreamSystemConsumer(config, null);
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index 2f6f598..d5da781 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -19,99 +19,83 @@
 package org.apache.samza.coordinator.metadatastore;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
 import java.util.*;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestCoordinatorStreamStore {
 
+  private static final String NAMESPACE = "namespace";
+  private static final Config CONFIG = new 
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", 
"test-kafka"));
+
   private CoordinatorStreamStore coordinatorStreamStore;
+  private NamespaceAwareCoordinatorStreamStore 
namespaceAwareCoordinatorStreamStore;
 
   @Before
-  public void setUp() {
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
-    Map<String, String> configMap = ImmutableMap.of("job.name", "test-job",
-                                                    "job.coordinator.system", 
"test-kafka");
-    MockCoordinatorStreamSystemFactory systemFactory = new 
MockCoordinatorStreamSystemFactory();
-    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(systemFactory);
-    
when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new
 SystemStream("test-kafka", "test"));
-    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), 
anyObject())).thenReturn("test");
-    coordinatorStreamStore = new 
CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap), 
new MetricsRegistryMap());
-    coordinatorStreamStore.init();
+  public void setup() {
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    namespaceAwareCoordinatorStreamStore = new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, NAMESPACE);
   }
 
   @Test
   public void testReadAfterWrite() {
-    String key = "test-key1";
+    String key = getCoordinatorMessageKey("test-key1");
     byte[] value = getValue("test-value1");
     Assert.assertNull(coordinatorStreamStore.get(key));
     coordinatorStreamStore.put(key, value);
     Assert.assertEquals(value, coordinatorStreamStore.get(key));
-    Assert.assertEquals(1, coordinatorStreamStore.all().size());
+    Assert.assertEquals(1, namespaceAwareCoordinatorStreamStore.all().size());
   }
 
   @Test
   public void testReadAfterDelete() {
-    String key = "test-key1";
+    String key = getCoordinatorMessageKey("test-key1");
     byte[] value = getValue("test-value1");
     Assert.assertNull(coordinatorStreamStore.get(key));
     coordinatorStreamStore.put(key, value);
     Assert.assertEquals(value, coordinatorStreamStore.get(key));
     coordinatorStreamStore.delete(key);
     Assert.assertNull(coordinatorStreamStore.get(key));
-    Assert.assertEquals(0, coordinatorStreamStore.all().size());
+    Assert.assertEquals(0, namespaceAwareCoordinatorStreamStore.all().size());
   }
 
   @Test
   public void testReadOfNonExistentKey() {
     Assert.assertNull(coordinatorStreamStore.get("randomKey"));
-    Assert.assertEquals(0, coordinatorStreamStore.all().size());
+    Assert.assertEquals(0, namespaceAwareCoordinatorStreamStore.all().size());
   }
 
   @Test
   public void testMultipleUpdatesForSameKey() {
-    String key = "test-key1";
+    String key = getCoordinatorMessageKey("test-key1");
     byte[] value = getValue("test-value1");
     byte[] value1 = getValue("test-value2");
     coordinatorStreamStore.put(key, value);
     coordinatorStreamStore.put(key, value1);
     Assert.assertEquals(value1, coordinatorStreamStore.get(key));
-    Assert.assertEquals(1, coordinatorStreamStore.all().size());
+    Assert.assertEquals(1, namespaceAwareCoordinatorStreamStore.all().size());
   }
 
   @Test
   public void testAllEntries() {
-    String key = "test-key1";
-    String key1 = "test-key2";
-    String key2 = "test-key3";
+    String key = getCoordinatorMessageKey("test-key1");
+    String key1 = getCoordinatorMessageKey("test-key2");
+    String key2 = getCoordinatorMessageKey("test-key3");
     byte[] value = getValue("test-value1");
     byte[] value1 = getValue("test-value2");
     byte[] value2 = getValue("test-value3");
     coordinatorStreamStore.put(key, value);
     coordinatorStreamStore.put(key1, value1);
     coordinatorStreamStore.put(key2, value2);
-    ImmutableMap<String, byte[]> expected = ImmutableMap.of(key, value, key1, 
value1, key2, value2);
-    Assert.assertEquals(expected, coordinatorStreamStore.all());
+    ImmutableMap<String, byte[]> expected = ImmutableMap.of("test-key1", 
value, "test-key2", value1, "test-key3", value2);
+    Assert.assertEquals(expected, namespaceAwareCoordinatorStreamStore.all());
   }
 
   private byte[] getValue(String value) {
@@ -119,4 +103,8 @@ public class TestCoordinatorStreamStore {
     SetTaskContainerMapping setTaskContainerMapping = new 
SetTaskContainerMapping("testSource", "testTask", value);
     return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
   }
+
+  private static String getCoordinatorMessageKey(String key) {
+    return 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(NAMESPACE, key);
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
new file mode 100644
index 0000000..06197f6
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestNamespaceAwareCoordinatorStreamStore {
+  private static final String KEY1 = "testKey";
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private NamespaceAwareCoordinatorStreamStore 
namespaceAwareCoordinatorStreamStore;
+  private String namespace;
+
+  @Before
+  public void setUp() {
+    namespace = RandomStringUtils.randomAlphabetic(5);
+    coordinatorStreamStore = Mockito.mock(CoordinatorStreamStore.class);
+    namespaceAwareCoordinatorStreamStore = new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, namespace);
+  }
+
+  @Test
+  public void testGetShouldDelegateTheInvocationToUnderlyingStore() {
+    String value = RandomStringUtils.randomAlphabetic(5);
+    String namespacedKey = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+    
Mockito.when(coordinatorStreamStore.all()).thenReturn(ImmutableMap.of(namespacedKey,
 value.getBytes(StandardCharsets.UTF_8)));
+
+    Assert.assertArrayEquals(value.getBytes(StandardCharsets.UTF_8), 
namespaceAwareCoordinatorStreamStore.get(KEY1));
+    Mockito.verify(coordinatorStreamStore).all();
+  }
+
+  @Test
+  public void testPutShouldDelegateTheInvocationToUnderlyingStore() {
+    String value = RandomStringUtils.randomAlphabetic(5);
+    String namespacedKey = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+
+    Mockito.doNothing().when(coordinatorStreamStore).put(namespacedKey, 
value.getBytes(StandardCharsets.UTF_8));
+
+    namespaceAwareCoordinatorStreamStore.put(KEY1, 
value.getBytes(StandardCharsets.UTF_8));
+
+    Mockito.verify(coordinatorStreamStore).put(namespacedKey, 
value.getBytes(StandardCharsets.UTF_8));
+  }
+
+  @Test
+  public void testDeleteShouldDelegateTheInvocationToUnderlyingStore() {
+    String namespacedKey = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+
+    namespaceAwareCoordinatorStreamStore.delete(KEY1);
+
+    Mockito.verify(coordinatorStreamStore).delete(namespacedKey);
+  }
+
+  @Test
+  public void testAllShouldDelegateToUnderlyingMetadaStore() {
+    String value = RandomStringUtils.randomAlphabetic(5);
+    String key2 = "key2";
+    String key3 = "key3";
+    byte[] valueAsBytes = value.getBytes(StandardCharsets.UTF_8);
+    String namespacedKey1 = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
+    String namespacedKey2 = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson("namespace1", key2);
+    String namespacedKey3 = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson("namespace1", key3);
+    
Mockito.when(coordinatorStreamStore.all()).thenReturn(ImmutableMap.of(namespacedKey1,
 valueAsBytes, namespacedKey2, new byte[0], namespacedKey3, new byte[0]));
+
+    Assert.assertEquals(ImmutableMap.of(KEY1, valueAsBytes), 
namespaceAwareCoordinatorStreamStore.all());
+    Mockito.verify(coordinatorStreamStore).all();
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 9e5e06c..d6ca2ca 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 public class TestCoordinatorStreamSystemProducer {
   @Test
   public void testCoordinatorStreamSystemProducer() {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
     String source = "source";
     SystemStream systemStream = new SystemStream("system", "stream");
     MockCoordinatorSystemProducer systemProducer = new 
MockCoordinatorSystemProducer(source);
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
 
b/samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
similarity index 55%
copy from 
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
copy to 
samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
index c38dc56..f8eb855 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
+++ 
b/samza-core/src/test/java/org/apache/samza/startpoint/StartpointManagerTestUtil.java
@@ -16,21 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.coordinator.metadatastore;
+package org.apache.samza.startpoint;
 
-import org.apache.samza.config.Config;
-import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
+import org.apache.samza.util.NoOpMetricsRegistry;
 
-/**
- * Builds the {@link CoordinatorStreamStore} based upon the provided {@link 
Config}
- * and {@link MetricsRegistry}.
- */
-public class CoordinatorStreamMetadataStoreFactory implements 
MetadataStoreFactory {
 
-  @Override
-  public MetadataStore getMetadataStore(String namespace, Config config, 
MetricsRegistry metricsRegistry) {
-    return new CoordinatorStreamStore(namespace, config, metricsRegistry);
+public class StartpointManagerTestUtil {
+
+  private StartpointManagerTestUtil() {
+  }
+
+  public static StartpointManager getStartpointManager() {
+    return new StartpointManager(new InMemoryMetadataStoreFactory(), new 
MapConfig(), new NoOpMetricsRegistry());
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
 
b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index cb20148..cc86882 100644
--- 
a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -28,33 +28,44 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metadatastore.InMemoryMetadataStore;
-import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-
 public class TestStartpointManager {
 
+  private static final Config CONFIG = new 
MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", 
"test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private StartpointManager startpointManager;
+
+  @Before
+  public void setup() {
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    startpointManager = new StartpointManager(coordinatorStreamStore);
+  }
+
   @Test
   public void testDefaultMetadataStore() {
-    MapConfig config = new MapConfig();
-    StartpointManager startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+    StartpointManager startpointManager = new 
StartpointManager(coordinatorStreamStore);
     Assert.assertNotNull(startpointManager);
-    Assert.assertEquals(InMemoryMetadataStore.class, 
startpointManager.getMetadataStore().getClass());
+    Assert.assertEquals(NamespaceAwareCoordinatorStreamStore.class, 
startpointManager.getMetadataStore().getClass());
   }
 
   @Test
   public void testNoLongerUsableAfterStop() {
-    MapConfig config = new MapConfig();
-    StartpointManager startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+    StartpointManager startpointManager = new 
StartpointManager(coordinatorStreamStore);
     startpointManager.start();
     SystemStreamPartition ssp =
         new SystemStreamPartition("mockSystem", "mockStream", new 
Partition(2));
@@ -101,7 +112,6 @@ public class TestStartpointManager {
 
   @Test
   public void testBasics() {
-    StartpointManager startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
     startpointManager.start();
     SystemStreamPartition ssp =
         new SystemStreamPartition("mockSystem", "mockStream", new 
Partition(2));
@@ -161,10 +171,8 @@ public class TestStartpointManager {
   }
 
   @Test
-  public void testStaleStartpoints() throws InterruptedException {
-    StartpointManager startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
-    SystemStreamPartition ssp =
-        new SystemStreamPartition("mockSystem", "mockStream", new 
Partition(2));
+  public void testStaleStartpoints() {
+    SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", 
"mockStream", new Partition(2));
     TaskName taskName = new TaskName("MockTask");
 
     startpointManager.start();
@@ -180,7 +188,6 @@ public class TestStartpointManager {
   @Test
   public void testGroupStartpointsPerTask() {
     MapConfig config = new MapConfig();
-    StartpointManager startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
     startpointManager.start();
     SystemStreamPartition sspBroadcast =
         new SystemStreamPartition("mockSystem1", "mockStream1", new 
Partition(2));
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 5e45547..0c009a1 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -25,11 +25,9 @@ import java.util.function.BiConsumer
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
-import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory
-import org.apache.samza.startpoint.{StartpointManager, StartpointOldest, 
StartpointUpcoming}
+import org.apache.samza.startpoint.{StartpointManagerTestUtil, 
StartpointOldest, StartpointUpcoming}
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, 
SystemStreamPartitionMetadata}
 import org.apache.samza.system._
-import org.apache.samza.util.NoOpMetricsRegistry
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Matchers._
@@ -526,7 +524,7 @@ class TestOffsetManager {
   }
 
   private def getStartpointManager() = {
-    val startpointManager = new StartpointManager(new 
InMemoryMetadataStoreFactory, new MapConfig, new NoOpMetricsRegistry)
+    val startpointManager = StartpointManagerTestUtil.getStartpointManager()
     startpointManager.start
     startpointManager
   }
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index daf665b..7789aca 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -35,8 +35,12 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
@@ -130,9 +134,10 @@ public class SamzaTaskProxy implements TaskProxy {
    * @return list of {@link Task} constructed from job model in coordinator 
stream.
    */
   protected List<Task> 
readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
-    LocalityManager localityManager = new 
LocalityManager(consumer.getConfig(), new MetricsRegistryMap());
+    CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap());
+    LocalityManager localityManager = new 
LocalityManager(coordinatorStreamStore);
     Map<String, Map<String, String>> containerIdToHostMapping = 
localityManager.readContainerLocality();
-    TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(consumer.getConfig(), new MetricsRegistryMap());
+    TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskContainerMapping.TYPE), new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskModeMapping.TYPE));
     Map<String, String> taskNameToContainerIdMapping = 
taskAssignmentManager.readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
     List<String> storeNames = 
JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e6ce045..ad0b94c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -53,6 +53,7 @@ import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
@@ -717,8 +718,9 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     Map<String, String> configMap = new HashMap<>();
     CoordinatorStreamValueSerde jsonSerde = new 
CoordinatorStreamValueSerde("set-config");
     metadataStore.all().forEach((key, value) -> {
+        CoordinatorStreamStore.CoordinatorMessageKey coordinatorMessageKey = 
CoordinatorStreamStore.deserializeCoordinatorMessageKeyFromJson(key);
         String deserializedValue = jsonSerde.fromBytes(value);
-        configMap.put(key, deserializedValue);
+        configMap.put(coordinatorMessageKey.getKey(), deserializedValue);
       });
     return new MapConfig(configMap);
   }

Reply via email to