This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b4a9fe6  SAMZA-2402: Tie Container placement service and Container 
placement handler and validate placement requests (#1267)
b4a9fe6 is described below

commit b4a9fe67238d6eb81b2c61177cafd276ade67c07
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Feb 18 12:32:05 2020 -0800

    SAMZA-2402: Tie Container placement service and Container placement handler 
and validate placement requests (#1267)
    
    Tie Container placement service and Container placement handler and 
validate placement requests
---
 .../ContainerPlacementResponseMessage.java         |   6 +-
 .../clustermanager/ClusterBasedJobCoordinator.java |  11 +-
 .../samza/clustermanager/ContainerManager.java     | 133 +++++++----
 .../clustermanager/ContainerProcessManager.java    |   9 +-
 .../ContainerPlacementMetadata.java                |  14 +-
 .../placement/ContainerPlacementMetadataStore.java |   5 +-
 .../ContainerPlacementRequestAllocator.java        |  19 +-
 .../apache/samza/util/BoundedLinkedHashSet.java    |  53 +++++
 .../TestContainerAllocatorWithHostAffinity.java    |  23 +-
 .../TestContainerAllocatorWithoutHostAffinity.java |  18 +-
 .../TestContainerPlacementActions.java             | 263 +++++++++++++++++----
 .../TestContainerProcessManager.java               |  47 +++-
 12 files changed, 480 insertions(+), 121 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
 
b/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
index 996bfe8..73d8287 100644
--- 
a/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
+++ 
b/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
@@ -42,7 +42,11 @@ public class ContainerPlacementResponseMessage extends 
ContainerPlacementMessage
     this(uuid, deploymentId, processorId, destinationHost, null, statusCode, 
responseMessage, timestamp);
   }
 
-  static ContainerPlacementResponseMessage 
fromContainerPlacementRequestMessage(
+  /**
+   * Creates a {@link ContainerPlacementResponseMessage} using {@link 
ContainerPlacementRequestMessage}
+   * status of current request and response associated with status
+   */
+  public static ContainerPlacementResponseMessage 
fromContainerPlacementRequestMessage(
       ContainerPlacementRequestMessage requestMessage, StatusCode statusCode, 
String responseMessage, long timestamp) {
     return new ContainerPlacementResponseMessage(requestMessage.getUuid(), 
requestMessage.getDeploymentId(), requestMessage.getProcessorId(),
         requestMessage.getDestinationHost(), 
requestMessage.getRequestExpiry(), statusCode, responseMessage, timestamp);
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 2b9636c..f894bcf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -38,6 +38,7 @@ import 
org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.classloader.IsolatingClassLoaderFactory;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -217,12 +218,16 @@ public class ClusterBasedJobCoordinator {
     this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
     this.jobCoordinatorSleepInterval = 
clusterManagerConfig.getJobCoordinatorSleepInterval();
 
+    // build metastore for container placement messages
+    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(metadataStore);
+
     // build a container process Manager
     containerProcessManager = createContainerProcessManager();
 
     // build utils related to container placements
-    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(metadataStore);
-    containerPlacementRequestAllocator = new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, 
containerProcessManager);
+    containerPlacementRequestAllocator =
+        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, 
containerProcessManager,
+            new ApplicationConfig(config));
     this.containerPlacementRequestAllocatorThread =
         new Thread(containerPlacementRequestAllocator, "Samza-" + 
ContainerPlacementRequestAllocator.class.getSimpleName());
   }
@@ -452,7 +457,7 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics);
+    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore);
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 79076d8..c924447 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -25,9 +25,12 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import 
org.apache.samza.clustermanager.container.placements.ContainerPlacementMetadata;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
+import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
+import org.apache.samza.util.BoundedLinkedHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +55,13 @@ public class ContainerManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerManager.class);
   private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+  private static final int UUID_CACHE_SIZE = 20000;
 
   /**
+   * Container placement metadata store to write responses to control actions
+   */
+  private final ContainerPlacementMetadataStore 
containerPlacementMetadataStore;
+  /**
    * Resource-manager, used to stop containers
    */
   private final ClusterResourceManager clusterResourceManager;
@@ -65,15 +73,23 @@ public class ContainerManager {
    * Key is chosen to be processorId since at a time only one placement action 
can be in progress on a container.
    */
   private final ConcurrentHashMap<String, ContainerPlacementMetadata> actions;
+  /**
+   * In-memory cache of placement requests UUIDs de-queued from the metadata 
store. Used to de-dup requests with the same
+   * request UUID. Sized using max tolerable memory footprint and max likely 
duplicate-spacing.
+   */
+  private final BoundedLinkedHashSet<UUID> placementRequestsCache;
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
-  public ContainerManager(SamzaApplicationState samzaApplicationState, 
ClusterResourceManager clusterResourceManager,
+  public ContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
       boolean hostAffinityEnabled, boolean standByEnabled) {
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
+    this.placementRequestsCache = new 
BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
     this.hostAffinityEnabled = hostAffinityEnabled;
+    this.containerPlacementMetadataStore = containerPlacementMetadataStore;
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
@@ -110,12 +126,14 @@ public class ContainerManager {
       ContainerPlacementMetadata actionMetaData = 
getPlacementActionMetadata(processorId).get();
       ContainerPlacementMetadata.ContainerStatus actionStatus = 
actionMetaData.getContainerStatus();
       if (samzaApplicationState.runningProcessors.containsKey(processorId) && 
actionStatus == ContainerPlacementMetadata.ContainerStatus.RUNNING) {
-        LOG.info("Requesting running container to shutdown due to existing 
container placement action {}", actionMetaData);
+        LOG.debug("Requesting running container to shutdown due to existing 
ContainerPlacement action {}", actionMetaData);
         
actionMetaData.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS);
+        updateContainerPlacementActionStatus(actionMetaData, 
ContainerPlacementMessage.StatusCode.IN_PROGRESS,
+            "Active container stop in progress");
         
clusterResourceManager.stopStreamProcessor(samzaApplicationState.runningProcessors.get(processorId));
         return false;
       } else if (actionStatus == 
ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {
-        LOG.info("Waiting for running container to shutdown due to existing 
container placement action {}", actionMetaData);
+        LOG.info("Waiting for running container to shutdown due to existing 
ContainerPlacement action {}", actionMetaData);
         return false;
       } else if (actionStatus == 
ContainerPlacementMetadata.ContainerStatus.STOPPED) {
         allocator.runStreamProcessor(request, preferredHost);
@@ -210,9 +228,10 @@ public class ContainerManager {
   void handleContainerLaunchSuccess(String processorId) {
     if (hasActiveContainerPlacementAction(processorId)) {
       ContainerPlacementMetadata metadata = 
getPlacementActionMetadata(processorId).get();
-      metadata.setActionStatus(ContainerPlacementMessage.StatusCode.SUCCEEDED,
-              "Successfully completed the container placement action");
-      LOG.info("Container placement action succeeded {}", metadata);
+      // Mark the active container running again and dispatch a response
+      
metadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.RUNNING);
+      updateContainerPlacementActionStatus(metadata, 
ContainerPlacementMessage.StatusCode.SUCCEEDED,
+          "Successfully completed the container placement action");
     }
   }
 
@@ -279,40 +298,49 @@ public class ContainerManager {
    * When host affinity is enabled move / restart is allowed on specific or 
ANY_HOST
    * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
    *
+   * Container placement requests are tied to deploymentId which is currently 
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
+   * On job restarts container placement requests queued for the previous 
deployment are deleted using this
+   *
    * @param requestMessage request containing logical processor id 0,1,2 and 
host where container is desired to be moved,
    *                       acceptable values of this param are any valid 
hostname or "ANY_HOST"(in this case the request
    *                       is sent to resource manager for any host)
    * @param containerAllocator to request physical resources
    */
   public void 
registerContainerPlacementAction(ContainerPlacementRequestMessage 
requestMessage, ContainerAllocator containerAllocator) {
-    LOG.info("Received a ContainerPlacement action request: {}", 
requestMessage);
     String processorId = requestMessage.getProcessorId();
     String destinationHost = requestMessage.getDestinationHost();
-    Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
-        checkValidControlAction(processorId, destinationHost, 
requestMessage.getUuid());
-
+    // Is the action ready to be de-queued and taken or it needs to wait to be 
executed in future
+    if (!deQueueAction(requestMessage)) {
+      return;
+    }
+    Pair<ContainerPlacementMessage.StatusCode, String> actionStatus = 
validatePlacementAction(requestMessage);
+    LOG.info("ContainerPlacement action is de-queued metadata: {}", 
requestMessage);
+    // Action is de-queued upon so we record it in the cache
+    placementRequestsCache.put(requestMessage.getUuid());
+    // Remove the request message from metastore since this message is already 
acted upon
+    
containerPlacementMetadataStore.deleteContainerPlacementRequestMessage(requestMessage.getUuid());
     // Request is bad just update the response on message & return
     if (actionStatus.getKey() == 
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+      writeContainerPlacementResponseMessage(requestMessage, 
actionStatus.getKey(), actionStatus.getValue());
       return;
     }
 
     SamzaResource currentResource = 
samzaApplicationState.runningProcessors.get(processorId);
-    LOG.info(
-        "Processor ID: {} matched an active container with deployment ID: {} 
is running on host: {} for ContainerPlacement action: {}",
+    LOG.info("Processor ID: {} matched an active container with containerId 
ID: {} is running on host: {} for ContainerPlacement action: {}",
         processorId, currentResource.getContainerId(), 
currentResource.getHost(), requestMessage);
 
+    // TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers 
to specific host
     if (!hostAffinityEnabled) {
-      LOG.info("Changing the requested host for placement action to {} because 
host affinity is disabled",
-          ResourceRequestState.ANY_HOST);
+      LOG.info("Changing the requested host for placement action to {} because 
host affinity is disabled", ResourceRequestState.ANY_HOST);
       destinationHost = ANY_HOST;
     }
 
     SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
     ContainerPlacementMetadata actionMetaData = new 
ContainerPlacementMetadata(requestMessage, currentResource.getHost());
     // Record the resource request for monitoring
-    
actionMetaData.setActionStatus(ContainerPlacementMessage.StatusCode.IN_PROGRESS,
 "Preferred Resources requested");
     actionMetaData.recordResourceRequest(resourceRequest);
     actions.put(processorId, actionMetaData);
+    updateContainerPlacementActionStatus(actionMetaData, 
ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources 
requested");
     containerAllocator.issueResourceRequest(resourceRequest);
     LOG.info("Issued resource request for preferred resources for 
ContainerPlacement action: {}", actionMetaData);
   }
@@ -342,8 +370,7 @@ public class ContainerManager {
   }
 
   private void markContainerPlacementActionFailed(ContainerPlacementMetadata 
metaData, String failureMessage) {
-    metaData.setActionStatus(ContainerPlacementMessage.StatusCode.FAILED, 
failureMessage);
-    LOG.info("Container Placement action failed with metadata {}", metaData);
+    updateContainerPlacementActionStatus(metaData, 
ContainerPlacementMessage.StatusCode.FAILED, failureMessage);
   }
 
   /**
@@ -354,7 +381,6 @@ public class ContainerManager {
     if (metadata.isPresent()) {
       switch (metadata.get().getActionStatus()) {
         case ACCEPTED:
-        case CREATED:
         case IN_PROGRESS:
           return true;
         default:
@@ -371,39 +397,62 @@ public class ContainerManager {
     return Optional.ofNullable(this.actions.get(processorId));
   }
 
+  private void updateContainerPlacementActionStatus(ContainerPlacementMetadata 
metadata,
+      ContainerPlacementMessage.StatusCode statusCode, String responseMessage) 
{
+    metadata.setActionStatus(statusCode, responseMessage);
+    writeContainerPlacementResponseMessage(metadata.getRequestMessage(), 
statusCode, responseMessage);
+    LOG.info("Status updated for ContainerPlacement action: {}", metadata);
+  }
+
+  private void 
writeContainerPlacementResponseMessage(ContainerPlacementRequestMessage 
requestMessage,
+      ContainerPlacementMessage.StatusCode statusCode, String responseMessage) 
{
+    containerPlacementMetadataStore.writeContainerPlacementResponseMessage(
+        
ContainerPlacementResponseMessage.fromContainerPlacementRequestMessage(requestMessage,
 statusCode,
+            responseMessage, System.currentTimeMillis()));
+  }
+
   /**
-   * A valid container placement action is only issued for a running processor 
with a valid processor id which has no
-   * in flight container requests. Duplicate actions are handled by deduping 
on uuid
+   * If there is an existing inflight request or container is pending a start, 
the container placement action shall wait
+   * until this in-flight action is complete
    *
-   * TODO: SAMZA-2402: Disallow pending Container Placement actions in 
metastore on job restarts
+   * @param requestMessage container placement request message
+   * @return true if action should be taken right now, false if it needs to 
wait to be taken in future
    */
-  private Pair<ContainerPlacementMessage.StatusCode, String> 
checkValidControlAction(String processorId, String destinationHost, UUID uuid) {
-    String errorMessagePrefix =
-        String.format("ControlAction to move or restart container with 
processor id %s to host %s is rejected due to",
-            processorId, destinationHost);
+  private boolean deQueueAction(ContainerPlacementRequestMessage 
requestMessage) {
+    // Do not dequeue action wait for the in-flight action to complete
+    if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
+      LOG.info("ContainerPlacement request: {} is en-queued because container 
has an in-progress placement action", requestMessage);
+      return false;
+    }
+    // Do not dequeue the action wait for the container to come to a running 
state
+    if 
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
+        || 
samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId()))
 {
+      LOG.info("ContainerPlacement request: {} is en-queued because container 
is pending start", requestMessage);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * A valid container placement action needs a valid processor id. Duplicate 
actions are handled by de-duping on uuid.
+   *
+   * @param requestMessage container placement request message
+   * @return Pair<ContainerPlacementMessage.StatusCode, String> which is 
status code & response suggesting if the request is valid
+   */
+  private Pair<ContainerPlacementMessage.StatusCode, String> 
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
+    String errorMessagePrefix = String.format("ContainerPlacement request: %s 
is rejected due to", requestMessage);
     Boolean invalidAction = false;
     String errorMessage = null;
     if (standbyContainerManager.isPresent()) {
-      errorMessage = String.format("%s not supported for host standby 
enabled", errorMessagePrefix);
+      errorMessage = String.format("%s not supported for hot standby enabled", 
errorMessagePrefix);
       invalidAction = true;
-    } else if (processorId == null || destinationHost == null) {
-      errorMessage = String.format("%s either processor id or the host 
argument is null", errorMessagePrefix);
+    } else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
+      errorMessage = String.format("%s duplicate UUID of the request, please 
retry", errorMessagePrefix);
       invalidAction = true;
-    } else if (Integer.parseInt(processorId) >= 
samzaApplicationState.processorCount.get()) {
+    } else if (Integer.parseInt(requestMessage.getProcessorId()) >= 
samzaApplicationState.processorCount.get()
+    ) {
       errorMessage = String.format("%s invalid processor id", 
errorMessagePrefix);
       invalidAction = true;
-    } else if (hasActiveContainerPlacementAction(processorId)) {
-      errorMessage = String.format("%s existing container placement action on 
container with metadata %s", errorMessagePrefix,
-          actions.get(processorId));
-      invalidAction = true;
-    } else if (actions.get(processorId) != null && 
actions.get(processorId).getUuid().equals(uuid)) {
-      errorMessage = String.format("%s duplicate Container Placement request 
received, previous action taken: %s", errorMessagePrefix,
-          actions.get(processorId));
-      invalidAction = true;
-    } else if 
(!samzaApplicationState.runningProcessors.containsKey(processorId)
-        || samzaApplicationState.pendingProcessors.containsKey(processorId)) {
-      errorMessage = String.format("%s container is either is not running or 
is in pending state", errorMessagePrefix);
-      invalidAction = true;
     }
 
     if (invalidAction) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 6ba7e7f..7f99159 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -127,7 +128,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   private JvmMetrics jvmMetrics;
   private Map<String, MetricsReporter> metricsReporters;
 
-  public ContainerProcessManager(Config config, SamzaApplicationState state, 
MetricsRegistryMap registry) {
+  public ContainerProcessManager(Config config, SamzaApplicationState state, 
MetricsRegistryMap registry,
+      ContainerPlacementMetadataStore metadataStore) {
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
     this.jobConfig = new JobConfig(config);
@@ -159,7 +161,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> 
reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(state, 
clusterResourceManager, hostAffinityEnabled, 
jobConfig.getStandbyTasksEnabled());
+    this.containerManager = new ContainerManager(metadataStore, state, 
clusterResourceManager, hostAffinityEnabled,
+        jobConfig.getStandbyTasksEnabled());
 
     this.containerAllocator = new 
ContainerAllocator(this.clusterResourceManager, config, state, 
hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
@@ -572,7 +575,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
    * @param requestMessage request containing details of the desited container 
placement action
    */
   public void 
registerContainerPlacementAction(ContainerPlacementRequestMessage 
requestMessage) {
-    // Call the ContainerManager#registerContainerPlacementAction
+    containerManager.registerContainerPlacementAction(requestMessage, 
containerAllocator);
   }
 
   private Duration getRetryDelay(String processorId) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
similarity index 89%
rename from 
samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
rename to 
samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
index fc521a1..15c9e1c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.clustermanager.container.placements;
+package org.apache.samza.clustermanager.container.placement;
 
 import java.time.Duration;
 import java.util.HashSet;
@@ -88,6 +88,10 @@ public class ContainerPlacementMetadata {
     return actionStatus;
   }
 
+  public ContainerPlacementRequestMessage getRequestMessage() {
+    return requestMessage;
+  }
+
   public synchronized String getResponseMessage() {
     return responseMessage;
   }
@@ -111,12 +115,16 @@ public class ContainerPlacementMetadata {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("ContainerPlacementMetadata{");
-    sb.append("Request= ").append(requestMessage);
+    sb.append(" UUID: ").append(requestMessage.getUuid());
+    sb.append(", Processor ID: ").append(requestMessage.getProcessorId());
+    sb.append(", 
deploymentId='").append(requestMessage.getDeploymentId()).append('\'');
+    sb.append(", 
destinationHost='").append(requestMessage.getDestinationHost()).append('\'');
+    sb.append(", requestExpiry=").append(requestMessage.getRequestExpiry());
     sb.append(", sourceHost='").append(sourceHost).append('\'');
-    sb.append(", resourceRequests=").append(resourceRequests);
     sb.append(", actionStatus=").append(actionStatus);
     sb.append(", containerStatus=").append(containerStatus);
     sb.append(", responseMessage='").append(responseMessage).append('\'');
+    sb.append(", resourceRequests=").append(resourceRequests);
     sb.append('}');
     return sb.toString();
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
index 02f698d..593cf3d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -132,7 +133,7 @@ public class ContainerPlacementMetadataStore {
    * only to write responses to Container Placement Action
    * @param message
    */
-  void 
writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage 
message) {
+  public void 
writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage 
message) {
     Preconditions.checkState(!stopped, "Underlying metadata store not 
available");
     Preconditions.checkNotNull(message);
     try {
@@ -264,6 +265,8 @@ public class ContainerPlacementMetadataStore {
         throw new SamzaException(e);
       }
     }
+    // Sort the actions in order of timestamp
+    
newActions.sort(Comparator.comparingLong(ContainerPlacementRequestMessage::getTimestamp));
     return newActions;
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
index f01ac41..5161cfb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
@@ -20,12 +20,15 @@ package org.apache.samza.clustermanager.container.placement;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.clustermanager.ContainerProcessManager;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Stateless handler that periodically dispatches {@link 
ContainerPlacementRequestMessage} read from Metadata store to Job Coordinator
+ * Container placement requests from the previous deployment are deleted from 
the metadata store, ContainerPlacementRequestAllocatorThread
+ * does this cleanup
  */
 public class ContainerPlacementRequestAllocator implements Runnable {
 
@@ -43,13 +46,18 @@ public class ContainerPlacementRequestAllocator implements 
Runnable {
    * State that controls the lifecycle of the 
ContainerPlacementRequestAllocator thread
    */
   private volatile boolean isRunning;
+  /**
+   * RunId of the app
+   */
+  private final String appRunId;
 
-  public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore 
containerPlacementMetadataStore, ContainerProcessManager manager) {
+  public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore 
containerPlacementMetadataStore, ContainerProcessManager manager, 
ApplicationConfig config) {
     Preconditions.checkNotNull(containerPlacementMetadataStore, 
"containerPlacementMetadataStore cannot be null");
     Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be 
null");
     this.containerProcessManager = manager;
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
     this.isRunning = true;
+    this.appRunId = config.getRunId();
   }
 
   @Override
@@ -59,8 +67,13 @@ public class ContainerPlacementRequestAllocator implements 
Runnable {
         for (ContainerPlacementRequestMessage message : 
containerPlacementMetadataStore.readAllContainerPlacementRequestMessages()) {
           // We do not need to dispatch ContainerPlacementResponseMessage 
because they are written from JobCoordinator
           // in response to a Container Placement Action
-          LOG.info("Received a container placement message {}", message);
-          containerProcessManager.registerContainerPlacementAction(message);
+          if (message.getDeploymentId().equals(appRunId)) {
+            LOG.debug("Received a container placement message {}", message);
+            containerProcessManager.registerContainerPlacementAction(message);
+          } else {
+            // Delete the ContainerPlacementMessages from the previous 
deployment
+            
containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
+          }
         }
         
Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
       } catch (InterruptedException e) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java 
b/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java
new file mode 100644
index 0000000..c99e475
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * LinkedinHashSet of bounded size {@code size} with a FIFO eviction policy
+ *
+ * This class is not thread-safe
+ */
+public class BoundedLinkedHashSet<T> {
+
+  private final int cacheSize;
+  private final Set<T> cache;
+
+  public BoundedLinkedHashSet(int size) {
+    this.cache = new LinkedHashSet<T>();
+    this.cacheSize = size;
+  }
+
+  public boolean containsKey(T element) {
+    return cache.contains(element);
+  }
+
+  public void put(T element) {
+    if (cache.size() > cacheSize) {
+      Iterator iterator = cache.iterator();
+      if (iterator.hasNext()) {
+        iterator.remove();
+      }
+    }
+    cache.add(element);
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index b7a527d..bf2eabf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -28,11 +28,14 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -60,7 +63,8 @@ public class TestContainerAllocatorWithHostAffinity {
   private final SamzaApplicationState state = new 
SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-  private final ContainerManager containerManager = new 
ContainerManager(state, clusterResourceManager, true, false);
+  private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+  private ContainerManager containerManager;
 
   private JobModelManager initializeJobModelManager(Config config, int 
containerCount) {
     Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -83,6 +87,12 @@ public class TestContainerAllocatorWithHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
+    CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    coordinatorStreamStore.init();
+    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
+    containerPlacementMetadataStore.start();
+    containerManager = new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager, true, false);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, 
containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -361,7 +371,8 @@ public class TestContainerAllocatorWithHostAffinity {
   public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() 
throws Exception {
     ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
-    ContainerManager containerManager = new ContainerManager(state, 
mockClusterResourceManager, true, false);
+    ContainerManager containerManager =
+        new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
         SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, 
List.class).get(0);
@@ -407,7 +418,8 @@ public class TestContainerAllocatorWithHostAffinity {
   @Test
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new 
MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, 
spyManager, true, false));
+    ContainerManager spyContainerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyManager, true, false));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, 
spyContainerManager));
     // Request Preferred Resources
@@ -450,7 +462,8 @@ public class TestContainerAllocatorWithHostAffinity {
   public void 
testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws 
Exception {
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new 
MockClusterResourceManager(callback, state));
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, 
spyClusterResourceManager, true, false));
+    ContainerManager spyContainerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyClusterResourceManager, true, false));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, 
spyContainerManager));
@@ -596,6 +609,8 @@ public class TestContainerAllocatorWithHostAffinity {
         put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
         put("job.host-affinity.enabled", "true");
+        put("job.name", "test-job");
+        put("job.coordinator.system", "test-kafka");
       }
     });
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b81e249..9d55218 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,10 +23,13 @@ import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -57,6 +60,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
   private final SamzaApplicationState state = new 
SamzaApplicationState(jobModelManager);
   private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback, state);
 
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+
   private ContainerAllocator containerAllocator;
   private MockContainerRequestState requestState;
   private Thread allocatorThread;
@@ -66,8 +72,13 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
+    CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    coordinatorStreamStore.init();
+    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
+    containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(state, manager, false, false));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, 
false, false));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = 
containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -94,6 +105,8 @@ public class TestContainerAllocatorWithoutHostAffinity {
         put("systems.test-system.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
         put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
         put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+        put("job.name", "test-job");
+        put("job.coordinator.system", "test-kafka");
       }
     });
 
@@ -262,7 +275,8 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
     ClusterResourceManager.Callback mockCPM = 
mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new 
MockClusterResourceManager(mockCPM, state);
-    ContainerManager spyContainerManager = spy(new ContainerManager(state, 
mockManager, false, false));
+    ContainerManager spyContainerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockManager, false, false));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, 
spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index fd0008e..b433cba 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -28,15 +28,21 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.RandomStringUtils;
-import 
org.apache.samza.clustermanager.container.placements.ContainerPlacementMetadata;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
+import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -53,6 +59,9 @@ import org.mockito.stubbing.Answer;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+/**
+ * Set of Integration tests for container placement actions
+ */
 @RunWith(MockitoJUnitRunner.class)
 public class TestContainerPlacementActions {
 
@@ -71,11 +80,23 @@ public class TestContainerPlacementActions {
       put("systems.test-system.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
       put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
       put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+      put("job.name", "test-job");
+      put("job.coordinator.system", "test-kafka");
+      put("app.run.id", "appAttempt-001");
     }
   };
 
   private Config config = new MapConfig(configVals);
 
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+
+  private SamzaApplicationState state;
+  private ContainerManager containerManager;
+  private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
+  private ContainerProcessManager cpm;
+  private ClusterResourceManager.Callback callback;
+
   private Config getConfig() {
     Map<String, String> map = new HashMap<>();
     map.putAll(config);
@@ -108,26 +129,27 @@ public class TestContainerPlacementActions {
   @Before
   public void setup() throws Exception {
     server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
-  }
-
-  @Test(timeout = 10000)
-  public void testContainerSuccessfulMoveAction() throws Exception {
+    // Utils Related to Container Placement Metadata store
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    coordinatorStreamStore.init();
+    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
+    containerPlacementMetadataStore.start();
+    // Utils Related to Cluster manager:
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
-
-    SamzaApplicationState state =
-        new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
-
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
+    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+    callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
-    ContainerManager containerManager = spy(new ContainerManager(state, 
clusterResourceManager, true, false));
-    MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
-        new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
-    ContainerProcessManager cpm =
-        new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
+    allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
+    cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+  }
 
+  @Test(timeout = 10000)
+  public void testContainerSuccessfulMoveAction() throws Exception {
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
         new Thread(() -> {
@@ -179,6 +201,7 @@ public class TestContainerPlacementActions {
     ContainerPlacementRequestMessage requestMessage =
         new ContainerPlacementRequestMessage(UUID.randomUUID(), 
"appAttempt-001", "0", "host-3",
             System.currentTimeMillis());
+
     ContainerPlacementMetadata metadata =
         
containerManager.registerContainerPlacementActionForTest(requestMessage, 
allocatorWithHostAffinity);
 
@@ -197,24 +220,120 @@ public class TestContainerPlacementActions {
     assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
     assertEquals(state.anyHostRequests.get(), 0);
     assertEquals(metadata.getActionStatus(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+    assertResponseMessage(responseMessage.get(), requestMessage);
   }
 
-  @Test(timeout = 10000)
-  public void 
testContainerMoveActionExpiredRequestNotAffectRunningContainers() throws 
Exception {
-    Map<String, String> conf = new HashMap<>();
-    conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
+  @Test(timeout = 30000)
+  public void testActionQueuingForConsecutivePlacementActions() throws 
Exception {
+    // Spawn a Request Allocator Thread
+    Thread requestAllocatorThread = new Thread(
+        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config)),
+        "ContainerPlacement Request Allocator Thread");
+    requestAllocatorThread.start();
 
-    SamzaApplicationState state =
-        new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onResourcesAvailable((List<SamzaResource>) args[0]);
+          }, "AMRMClientAsync").start();
+        return null;
+      }
+    }).when(callback).onResourcesAvailable(anyList());
 
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
-    MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = spy(new ContainerManager(state, 
clusterResourceManager, true, false));
-    MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
-        new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
-    ContainerProcessManager cpm =
-        new ContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, new MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]);
+          }, "AMRMClientAsync").start();
+        return null;
+      }
+    }).when(callback).onStreamProcessorLaunchSuccess(any());
+
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]);
+          }, "AMRMClientAsync").start();
+        return null;
+      }
+    }).when(callback).onResourcesCompleted(anyList());
+
+    cpm.start();
+
+    if (!allocatorWithHostAffinity.awaitContainersStart(2, 5, 
TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+
+    while (state.runningProcessors.size() != 2) {
+      Thread.sleep(100);
+    }
+
+    // App is in running state with two containers running
+    assertEquals(state.runningProcessors.size(), 2);
+    assertEquals(state.runningProcessors.get("0").getHost(), "host-1");
+    assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
+    assertEquals(state.preferredHostRequests.get(), 2);
+    assertEquals(state.anyHostRequests.get(), 0);
+
+    // Initiate container placement action to move a container with container 
id 0
+
+    UUID requestUUIDMove1 = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0", "host-3",
+        null, System.currentTimeMillis());
+
+    UUID requestUUIDMoveBad = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-002",
 "0", "host-4",
+        null, System.currentTimeMillis());
+
+    UUID requestUUIDMove2 = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0", "host-4",
+        null, System.currentTimeMillis());
+
+    // Wait for the ControlAction to complete
+    while (true) {
+      if 
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).isPresent()
 &&
+          
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).get().getStatusCode()
+              == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+        break;
+      }
+      Thread.sleep(Duration.ofSeconds(5).toMillis());
+    }
+
+    assertEquals(state.preferredHostRequests.get(), 4);
+    assertEquals(state.runningProcessors.size(), 2);
+    assertEquals(state.runningProcessors.get("0").getHost(), "host-4");
+    assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
+    assertEquals(state.anyHostRequests.get(), 0);
+
+    Optional<ContainerPlacementResponseMessage> responseMessageMove1 =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove1);
+
+    Optional<ContainerPlacementResponseMessage> responseMessageMove2 =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2);
+
+    assertTrue(responseMessageMove1.isPresent());
+    assertEquals(responseMessageMove1.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+    assertTrue(responseMessageMove2.isPresent());
+    assertEquals(responseMessageMove2.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+    // Request should be deleted as soon as ita accepted / being acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove1).isPresent());
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove2).isPresent());
+
+    // Requests from Previous deploy must be cleaned
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+  }
+
+  @Test(timeout = 10000)
+  public void 
testContainerMoveActionExpiredRequestNotAffectRunningContainers() throws 
Exception {
 
     // Mimic the behavior of Expired request
     doAnswer(new Answer<Void>() {
@@ -276,25 +395,19 @@ public class TestContainerPlacementActions {
     assertEquals(state.runningProcessors.get("0").getHost(), "host-1");
     assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
     assertEquals(state.anyHostRequests.get(), 0);
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.FAILED);
+    assertResponseMessage(responseMessage.get(), requestMessage);
+    // Request shall be deleted as soon as it is acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
 
   @Test(timeout = 10000)
   public void 
testActiveContainerLaunchFailureOnControlActionShouldFallbackToSourceHost() 
throws Exception {
-    Map<String, String> conf = new HashMap<>();
-    conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
-
-    SamzaApplicationState state =
-        new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
-
-    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
-    MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = spy(new ContainerManager(state, 
clusterResourceManager, true, false));
-    MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
-        new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
-    ContainerProcessManager cpm =
-        new ContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, new MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
-
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
         new Thread(() -> {
@@ -375,6 +488,16 @@ public class TestContainerPlacementActions {
     assertEquals(state.anyHostRequests.get(), 0);
     // Control Action should be failed in this case
     assertEquals(metadata.getActionStatus(), 
ContainerPlacementMessage.StatusCode.FAILED);
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.FAILED);
+    assertResponseMessage(responseMessage.get(), requestMessage);
+
+    // Request shall be deleted as soon as it is acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
 
   @Test(timeout = 10000)
@@ -385,7 +508,8 @@ public class TestContainerPlacementActions {
         new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, 
clusterResourceManager, false, false);
+    ContainerManager containerManager =
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, false, false);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, 
new MapConfig(conf), state,
             containerManager);
@@ -479,6 +603,36 @@ public class TestContainerPlacementActions {
     assertEquals(3, state.anyHostRequests.get());
     // Action should success
     assertEquals(ContainerPlacementMessage.StatusCode.SUCCEEDED, 
metadata.getActionStatus());
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+    assertResponseMessage(responseMessage.get(), requestMessage);
+
+    /**
+     * Inject a duplicate request and check it is not accepted
+     */
+    ContainerPlacementRequestMessage duplicateRequestToBeIgnored =
+        new ContainerPlacementRequestMessage(requestMessage.getUuid(), 
"app-attempt-001", "1",
+            "host-3", System.currentTimeMillis());
+
+    // Request with a dup uuid should not be accepted
+    metadata = 
containerManager.registerContainerPlacementActionForTest(duplicateRequestToBeIgnored,
+        allocatorWithoutHostAffinity);
+    // metadata should be from the previous completed action
+    assertTrue(metadata == null || metadata.getUuid() != 
duplicateRequestToBeIgnored.getUuid());
+
+    responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+    assertResponseMessage(responseMessage.get(), duplicateRequestToBeIgnored);
+
+    // Request shall be deleted as soon as it is acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
 
   @Test(expected = NullPointerException.class)
@@ -487,7 +641,8 @@ public class TestContainerPlacementActions {
         new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = spy(new ContainerManager(state, 
clusterResourceManager, true, false));
+    ContainerManager containerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
@@ -521,6 +676,13 @@ public class TestContainerPlacementActions {
     assertBadRequests("2", "host8", containerManager, 
allocatorWithHostAffinity);
   }
 
+  private void assertResponseMessage(ContainerPlacementResponseMessage 
responseMessage,
+      ContainerPlacementRequestMessage requestMessage) {
+    assertEquals(responseMessage.getProcessorId(), 
requestMessage.getProcessorId());
+    assertEquals(responseMessage.getDeploymentId(), 
requestMessage.getDeploymentId());
+    assertEquals(responseMessage.getDestinationHost(), 
requestMessage.getDestinationHost());
+  }
+
   private void assertBadRequests(String processorId, String destinationHost, 
ContainerManager containerManager,
       ContainerAllocator allocator) {
     ContainerPlacementRequestMessage requestMessage =
@@ -529,5 +691,14 @@ public class TestContainerPlacementActions {
     ContainerPlacementMetadata metadata =
         
containerManager.registerContainerPlacementActionForTest(requestMessage, 
allocator);
     assertNull(metadata);
+
+    Optional<ContainerPlacementResponseMessage> responseMessage =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+    assertTrue(responseMessage.isPresent());
+    assertEquals(responseMessage.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+    assertResponseMessage(responseMessage.get(), requestMessage);
+    // Request shall be deleted as soon as it is acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index eca6d6c..14771e7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -28,12 +28,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -77,9 +80,12 @@ public class TestContainerProcessManager {
       put("systems.test-system.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
       put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
       put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
+      put("job.name", "test-job");
+      put("job.coordinator.system", "test-kafka");
     }
   };
   private Config config = new MapConfig(configVals);
+  private ContainerPlacementMetadataStore containerPlacementMetadataStore;
 
   private Config getConfig() {
     Map<String, String> map = new HashMap<>();
@@ -122,6 +128,11 @@ public class TestContainerProcessManager {
   @Before
   public void setup() throws Exception {
     server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
+    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
+    CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    coordinatorStreamStore.init();
+    containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
+    containerPlacementMetadataStore.start();
   }
 
   private Field getPrivateFieldFromCpm(String fieldName, 
ContainerProcessManager object) throws Exception {
@@ -141,7 +152,8 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, 
clusterResourceManager, true, false);
+    ContainerManager containerManager =
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -185,7 +197,8 @@ public class TestContainerProcessManager {
     ClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.empty());
@@ -252,7 +265,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -299,7 +313,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -397,7 +412,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -482,7 +498,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -591,7 +608,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -631,7 +649,7 @@ public class TestContainerProcessManager {
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, 
clusterResourceManager,
+    ContainerManager containerManager = new 
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
         
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -666,7 +684,7 @@ public class TestContainerProcessManager {
         "1", "host2")));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new ContainerManager(state, 
clusterResourceManager,
+    ContainerManager containerManager = new 
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
         
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
 
     MockContainerAllocatorWithHostAffinity allocator = new 
MockContainerAllocatorWithHostAffinity(
@@ -731,7 +749,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(conf)));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -805,7 +824,8 @@ public class TestContainerProcessManager {
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(config)));
     ContainerManager containerManager =
-        new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -897,7 +917,8 @@ public class TestContainerProcessManager {
 
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator) {
-    return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager, allocator,
-         new ContainerManager(state, clusterResourceManager, 
clusterManagerConfig.getHostAffinityEnabled(), false));
+    return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager,
+        allocator, new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager,
+        clusterManagerConfig.getHostAffinityEnabled(), false));
   }
 }

Reply via email to