This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d018d00  HDDS-5805. remove containerStateManager V1 code (#2727)
d018d00 is described below

commit d018d00ee8ea6e99a9f849ad16278eda11c0d334
Author: Jackson Yao <[email protected]>
AuthorDate: Fri Oct 29 07:31:04 2021 +0800

    HDDS-5805. remove containerStateManager V1 code (#2727)
---
 .../hdds/scm/container/ContainerManager.java       |   2 +-
 .../hdds/scm/container/ContainerManagerImpl.java   |   4 +-
 .../hdds/scm/container/ContainerStateManager.java  | 570 ++++-----------------
 .../scm/container/ContainerStateManagerImpl.java   |  10 +-
 .../scm/container/ContainerStateManagerV2.java     | 189 -------
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  18 +-
 .../scm/container/TestContainerReportHandler.java  | 267 +++++-----
 .../scm/container/TestContainerStateManager.java   |  91 +++-
 .../TestIncrementalContainerReportHandler.java     | 126 +++--
 .../hdds/scm/container/TestReplicationManager.java | 370 +++++++------
 .../scm/container/TestUnknownContainerReport.java  |  47 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |   8 +-
 .../TestContainerStateManagerIntegration.java      |   2 +-
 13 files changed, 664 insertions(+), 1040 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 81445ea..74e70bb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -192,5 +192,5 @@ public interface ContainerManager extends Closeable {
    * Returns containerStateManger.
    * @return containerStateManger
    */
-  ContainerStateManagerV2 getContainerStateManager();
+  ContainerStateManager getContainerStateManager();
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index d316cd8..5e220df 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -83,7 +83,7 @@ public class ContainerManagerImpl implements ContainerManager 
{
   /**
    *
    */
-  private final ContainerStateManagerV2 containerStateManager;
+  private final ContainerStateManager containerStateManager;
 
   private final SCMHAManager haManager;
   private final SequenceIdGenerator sequenceIdGen;
@@ -443,7 +443,7 @@ public class ContainerManagerImpl implements 
ContainerManager {
 
   // Remove this after fixing Recon
   @VisibleForTesting
-  public ContainerStateManagerV2 getContainerStateManager() {
+  public ContainerStateManager getContainerStateManager() {
     return containerStateManager;
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index e5ddba9..dbd10a3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,530 +18,170 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.states.ContainerState;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.Table;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AtomicLongMap;
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * A container state manager keeps track of container states and returns
- * containers that match various queries.
- * <p>
- * This state machine is driven by a combination of server and client actions.
- * <p>
- * This is how a create container happens: 1. When a container is created, the
- * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
- * has chosen a pipeline for container to live on. However, the container is 
not
- * created yet. This container along with the pipeline is returned to the
- * client.
- * <p>
- * 2. The client when it sees the Container state as ALLOCATED understands that
- * container needs to be created on the specified pipeline. The client lets the
- * SCM know that saw this flag and is initiating the on the data nodes.
- * <p>
- * This is done by calling into notifyObjectCreation(ContainerName,
- * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
- * into CREATING. All this state means is that SCM told Client to create a
- * container and client saw that request.
- * <p>
- * 3. Then client makes calls to datanodes directly, asking the datanodes to
- * create the container. This is done with the help of pipeline that supports
- * this container.
- * <p>
- * 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifying the containerName and the
- * COMPLETE_CREATE as the Event.
- * <p>
- * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
- * the state when clients can write to a container.
- * <p>
- * 6. If the client does not respond with the COMPLETE_CREATE event with a
- * certain time, the state machine times out and triggers a delete operation of
- * the container.
- * <p>
- * Please see the function initializeStateMachine below to see how this looks 
in
- * code.
- * <p>
- * Reusing existing container :
- * <p>
- * The create container call is not made all the time, the system tries to use
- * open containers as much as possible. So in those cases, it looks thru the
- * list of open containers and will return containers that match the specific
- * signature.
- * <p>
- * Please note : Logically there are 3 separate state machines in the case of
- * containers.
- * <p>
- * The Create State Machine -- Commented extensively above.
- * <p>
- * Open/Close State Machine - Once the container is in the Open State,
- * eventually it will be closed, once sufficient data has been written to it.
- * <p>
- * TimeOut Delete Container State Machine - if the container creating times 
out,
- * then Container State manager decides to delete the container.
+ * A ContainerStateManager is responsible for keeping track of all the
+ * container and its state inside SCM, it also exposes methods to read and
+ * modify the container and its state.
+ *
+ * All the mutation operations are marked with {@link Replicate} annotation so
+ * that when SCM-HA is enabled, the mutations are replicated from leader SCM
+ * to the followers.
+ *
+ * When a method is marked with {@link Replicate} annotation it should follow
+ * the below rules.
+ *
+ * 1. The method call should be Idempotent
+ * 2. Arguments should be of protobuf objects
+ * 3. Return type should be of protobuf object
+ * 4. The declaration should throw RaftException
+ *
  */
-public class ContainerStateManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStateManager.class);
-
-  private final StateMachine<HddsProtos.LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
-
-  private final long containerSize;
-  private final boolean autoCreateRatisOne;
-  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
-  private final ContainerStateMap containers;
-  private final AtomicLong containerCount;
-  private final AtomicLongMap<LifeCycleState> containerStateCount =
-      AtomicLongMap.create();
+public interface ContainerStateManager {
+
+  /* **********************************************************************
+   * Container Life Cycle                                                 *
+   *                                                                      *
+   * Event and State Transition Mapping:                                  *
+   *                                                                      *
+   * State: OPEN         ----------------> CLOSING                        *
+   * Event:                    FINALIZE                                   *
+   *                                                                      *
+   * State: CLOSING      ----------------> QUASI_CLOSED                   *
+   * Event:                  QUASI_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSING      ----------------> CLOSED                         *
+   * Event:                     CLOSE                                     *
+   *                                                                      *
+   * State: QUASI_CLOSED ----------------> CLOSED                         *
+   * Event:                  FORCE_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSED       ----------------> DELETING                       *
+   * Event:                    DELETE                                     *
+   *                                                                      *
+   * State: DELETING     ----------------> DELETED                        *
+   * Event:                    CLEANUP                                    *
+   *                                                                      *
+   *                                                                      *
+   * Container State Flow:                                                *
+   *                                                                      *
+   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]        *
+   *          (FINALIZE)      |      (QUASI_CLOSE)        |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                  (CLOSE) |             (FORCE_CLOSE) |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                          +--------->[CLOSED]<--------+               *
+   *                                        |                             *
+   *                                (DELETE)|                             *
+   *                                        |                             *
+   *                                        |                             *
+   *                                   [DELETING]                         *
+   *                                        |                             *
+   *                              (CLEANUP) |                             *
+   *                                        |                             *
+   *                                        V                             *
+   *                                    [DELETED]                         *
+   *                                                                      *
+   ************************************************************************/
 
   /**
-   * Constructs a Container State Manager that tracks all containers owned by
-   * SCM for the purpose of allocation of blocks.
-   * <p>
-   * TODO : Add Container Tags so we know which containers are owned by SCM.
-   */
-  @SuppressWarnings("unchecked")
-  public ContainerStateManager(final ConfigurationSource configuration) {
-
-    // Initialize the container state machine.
-    final Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
-
-    // These are the steady states of a container.
-    finalStates.add(LifeCycleState.OPEN);
-    finalStates.add(LifeCycleState.CLOSED);
-    finalStates.add(LifeCycleState.DELETED);
-
-    this.stateMachine = new StateMachine<>(LifeCycleState.OPEN,
-        finalStates);
-    initializeStateMachine();
-
-    this.containerSize = (long) configuration.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-    this.autoCreateRatisOne = configuration.getBoolean(
-            ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT);
-
-    this.lastUsedMap = new ConcurrentHashMap<>();
-    this.containerCount = new AtomicLong(0);
-    this.containers = new ContainerStateMap();
-  }
-
-  /*
-   *
-   * Event and State Transition Mapping:
-   *
-   * State: OPEN         ----------------> CLOSING
-   * Event:                    FINALIZE
-   *
-   * State: CLOSING      ----------------> QUASI_CLOSED
-   * Event:                  QUASI_CLOSE
-   *
-   * State: CLOSING      ----------------> CLOSED
-   * Event:                     CLOSE
-   *
-   * State: QUASI_CLOSED ----------------> CLOSED
-   * Event:                  FORCE_CLOSE
-   *
-   * State: CLOSED       ----------------> DELETING
-   * Event:                    DELETE
-   *
-   * State: DELETING     ----------------> DELETED
-   * Event:                    CLEANUP
-   *
-   *
-   * Container State Flow:
-   *
-   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]
-   *          (FINALIZE)      |      (QUASI_CLOSE)        |
-   *                          |                           |
-   *                          |                           |
-   *                  (CLOSE) |             (FORCE_CLOSE) |
-   *                          |                           |
-   *                          |                           |
-   *                          +--------->[CLOSED]<--------+
-   *                                        |
-   *                                (DELETE)|
-   *                                        |
-   *                                        |
-   *                                   [DELETING]
-   *                                        |
-   *                              (CLEANUP) |
-   *                                        |
-   *                                        V
-   *                                    [DELETED]
    *
    */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(LifeCycleState.OPEN,
-        LifeCycleState.CLOSING,
-        LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSING,
-        LifeCycleState.QUASI_CLOSED,
-        LifeCycleEvent.QUASI_CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSING,
-        LifeCycleState.CLOSED,
-        LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.QUASI_CLOSED,
-        LifeCycleState.CLOSED,
-        LifeCycleEvent.FORCE_CLOSE);
-
-    stateMachine.addTransition(LifeCycleState.CLOSED,
-        LifeCycleState.DELETING,
-        LifeCycleEvent.DELETE);
-
-    stateMachine.addTransition(LifeCycleState.DELETING,
-        LifeCycleState.DELETED,
-        LifeCycleEvent.CLEANUP);
-  }
-
-
-  void loadContainer(final ContainerInfo containerInfo) throws SCMException {
-    containers.addContainer(containerInfo);
-    containerCount.set(Long.max(
-        containerInfo.getContainerID(), containerCount.get()));
-    containerStateCount.incrementAndGet(containerInfo.getState());
-  }
+  boolean contains(HddsProtos.ContainerID containerID);
 
   /**
-   * Allocates a new container based on the type, replication etc.
+   * Returns the ID of all the managed containers.
+   *
+   * @return Set of {@link ContainerID}
    */
-  ContainerInfo allocateContainer(final PipelineManager pipelineManager,
-      final ReplicationConfig replicationConfig, final String owner)
-      throws IOException {
-    final List<Pipeline> pipelines = pipelineManager
-        .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
-    Pipeline pipeline;
-
-    boolean bgCreateOne = RatisReplicationConfig
-        .hasFactor(replicationConfig, ReplicationFactor.ONE)
-        && autoCreateRatisOne;
-    boolean bgCreateThree = RatisReplicationConfig
-        .hasFactor(replicationConfig, ReplicationFactor.THREE);
-
-    if (!pipelines.isEmpty() && (bgCreateOne || bgCreateThree)) {
-      // let background create Ratis pipelines.
-      pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
-    } else {
-      try {
-        pipeline = pipelineManager.createPipeline(replicationConfig);
-        pipelineManager.waitPipelineReady(pipeline.getId(), 0);
-      } catch (IOException e) {
-
-        if (pipelines.isEmpty()) {
-          throw new IOException("Could not allocate container. Cannot get any" 
+
-              " matching pipeline for replicationConfig:" + replicationConfig +
-              ", State:PipelineState.OPEN");
-        }
-        pipeline = pipelines.get((int) containerCount.get() % 
pipelines.size());
-      }
-    }
-
-    synchronized (pipeline) {
-      return allocateContainer(pipelineManager, owner, pipeline);
-    }
-  }
+  Set<ContainerID> getContainerIDs();
 
   /**
-   * Allocates a new container based on the type, replication etc.
-   * This method should be called only after the lock on the pipeline is held
-   * on which the container will be allocated.
    *
-   * @param pipelineManager   - Pipeline Manager class.
-   * @param owner             - Owner of the container.
-   * @param pipeline          - Pipeline to which the container needs to be
-   *                          allocated.
-   * @return ContainerWithPipeline
-   * @throws IOException on Failure.
    */
-  ContainerInfo allocateContainer(
-      final PipelineManager pipelineManager, final String owner,
-      Pipeline pipeline) throws IOException {
-    Preconditions.checkNotNull(pipeline,
-        "Pipeline couldn't be found for the new container. "
-            + "Do you have enough nodes?");
-
-    final long containerID = containerCount.incrementAndGet();
-    final ContainerInfo containerInfo = new ContainerInfo.Builder()
-        .setState(LifeCycleState.OPEN)
-        .setPipelineID(pipeline.getId())
-        .setUsedBytes(0)
-        .setNumberOfKeys(0)
-        .setStateEnterTime(Time.now())
-        .setOwner(owner)
-        .setContainerID(containerID)
-        .setDeleteTransactionId(0)
-        .setReplicationConfig(pipeline.getReplicationConfig())
-        .build();
-    addContainerInfo(containerID, containerInfo, pipelineManager, pipeline);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("New container allocated: {}", containerInfo);
-    }
-    return containerInfo;
-  }
-
-  public void addContainerInfo(long containerID,
-                               ContainerInfo containerInfo,
-                               PipelineManager pipelineManager,
-                               Pipeline pipeline) throws IOException {
-    Preconditions.checkNotNull(containerInfo);
-    containers.addContainer(containerInfo);
-    if (pipeline != null) {
-      // In Recon, while adding a 'new' CLOSED container, pipeline will be a
-      // random ID, and hence be passed down as null.
-      pipelineManager.addContainerToPipeline(pipeline.getId(),
-          ContainerID.valueOf(containerID));
-    }
-    containerStateCount.incrementAndGet(containerInfo.getState());
-  }
+  Set<ContainerID> getContainerIDs(LifeCycleState state);
 
   /**
-   * Update the Container State to the next state.
    *
-   * @param containerID - ContainerID
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
    */
-  void updateContainerState(final ContainerID containerID,
-      final HddsProtos.LifeCycleEvent event)
-      throws SCMException, ContainerNotFoundException {
-    final ContainerInfo info = containers.getContainerInfo(containerID);
-    try {
-      final LifeCycleState oldState = info.getState();
-      final LifeCycleState newState = stateMachine.getNextState(
-          info.getState(), event);
-      containers.updateState(containerID, info.getState(), newState);
-      containerStateCount.incrementAndGet(newState);
-      containerStateCount.decrementAndGet(oldState);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update container state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          containerID, info.getState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
-    }
-  }
+  ContainerInfo getContainer(HddsProtos.ContainerID id);
 
   /**
-   * Update deleteTransactionId for a container.
    *
-   * @param deleteTransactionMap maps containerId to its new
-   *                             deleteTransactionID
    */
-  void updateDeleteTransactionId(
-      final Map<Long, Long> deleteTransactionMap) {
-    deleteTransactionMap.forEach((k, v) -> {
-      containers.getContainerInfo(ContainerID.valueOf(k))
-          .updateDeleteTransactionId(v);
-    });
-  }
-
+  Set<ContainerReplica> getContainerReplicas(HddsProtos.ContainerID id);
 
   /**
-   * Return a container matching the attributes specified.
    *
-   * @param size         - Space needed in the Container.
-   * @param owner        - Owner of the container - A specific nameservice.
-   * @param pipelineID   - ID of the pipeline
-   * @param containerIDs - Set of containerIDs to choose from
-   * @return ContainerInfo, null if there is no match found.
    */
-  ContainerInfo getMatchingContainer(final long size, String owner,
-      PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) {
-    if (containerIDs.isEmpty()) {
-      return null;
-    }
-
-    // Get the last used container and find container above the last used
-    // container ID.
-    final ContainerState key = new ContainerState(owner, pipelineID);
-    final ContainerID lastID =
-        lastUsedMap.getOrDefault(key, containerIDs.first());
-
-    // There is a small issue here. The first time, we will skip the first
-    // container. But in most cases it will not matter.
-    NavigableSet<ContainerID> resultSet = containerIDs.tailSet(lastID, false);
-    if (resultSet.size() == 0) {
-      resultSet = containerIDs;
-    }
-
-    ContainerInfo selectedContainer =
-        findContainerWithSpace(size, resultSet, owner, pipelineID);
-    if (selectedContainer == null) {
-
-      // If we did not find any space in the tailSet, we need to look for
-      // space in the headset, we need to pass true to deal with the
-      // situation that we have a lone container that has space. That is we
-      // ignored the last used container under the assumption we can find
-      // other containers with space, but if have a single container that is
-      // not true. Hence we need to include the last used container as the
-      // last element in the sorted set.
-
-      resultSet = containerIDs.headSet(lastID, true);
-      selectedContainer =
-          findContainerWithSpace(size, resultSet, owner, pipelineID);
-    }
-
-    return selectedContainer;
-  }
-
-  private ContainerInfo findContainerWithSpace(final long size,
-      final NavigableSet<ContainerID> searchSet, final String owner,
-      final PipelineID pipelineID) {
-    // Get the container with space to meet our request.
-    for (ContainerID id : searchSet) {
-      final ContainerInfo containerInfo = containers.getContainerInfo(id);
-      if (containerInfo.getUsedBytes() + size <= this.containerSize) {
-        containerInfo.updateLastUsedTime();
-        return containerInfo;
-      }
-    }
-    return null;
-  }
-
-  Set<ContainerID> getAllContainerIDs() {
-    return containers.getAllContainerIDs();
-  }
+  void updateContainerReplica(HddsProtos.ContainerID id,
+                              ContainerReplica replica);
 
   /**
-   * Returns Containers by State.
    *
-   * @param state - State - Open, Closed etc.
-   * @return List of containers by state.
    */
-  Set<ContainerID> getContainerIDsByState(final LifeCycleState state) {
-    return containers.getContainerIDsByState(state);
-  }
+  void removeContainerReplica(HddsProtos.ContainerID id,
+                              ContainerReplica replica);
 
   /**
-   * Get count of containers in the current {@link LifeCycleState}.
    *
-   * @param state {@link LifeCycleState}
-   * @return Count of containers
    */
-  Integer getContainerCountByState(final LifeCycleState state) {
-    return Long.valueOf(containerStateCount.get(state)).intValue();
-  }
+  @Replicate
+  void addContainer(ContainerInfoProto containerInfo)
+      throws IOException;
 
   /**
-   * Returns a set of ContainerIDs that match the Container.
    *
-   * @param owner  Owner of the Containers.
-   * @param repConfig - Replication Config of the containers
-   * @param state - Current State, like Open, Close etc.
-   * @return Set of containers that match the specific query parameters.
    */
-  NavigableSet<ContainerID> getMatchingContainerIDs(final String owner,
-      final ReplicationConfig repConfig, final LifeCycleState state) {
-    return containers.getMatchingContainerIDs(state, owner, repConfig);
-  }
+  @Replicate
+  void updateContainerState(HddsProtos.ContainerID id,
+                            HddsProtos.LifeCycleEvent event)
+      throws IOException, InvalidStateTransitionException;
 
   /**
-   * Returns the containerInfo for the given container id.
-   * @param containerID id of the container
-   * @return ContainerInfo containerInfo
-   * @throws IOException
+   *
    */
-  ContainerInfo getContainer(final ContainerID containerID)
-      throws ContainerNotFoundException {
-    final ContainerInfo container = containers.getContainerInfo(containerID);
-    if (container != null) {
-      return container;
-    }
-    throw new ContainerNotFoundException(containerID.toString());
-  }
-
-  void close() throws IOException {
-  }
+  // Make this as @Replicate
+  void updateDeleteTransactionId(Map<ContainerID, Long> deleteTransactionMap)
+      throws IOException;
 
   /**
-   * Returns the latest list of DataNodes where replica for given containerId
-   * exist. Throws an SCMException if no entry is found for given containerId.
    *
-   * @param containerID
-   * @return Set<DatanodeDetails>
    */
-  Set<ContainerReplica> getContainerReplicas(
-      final ContainerID containerID) throws ContainerNotFoundException {
-    return containers.getContainerReplicas(containerID);
-  }
+  ContainerInfo getMatchingContainer(long size, String owner,
+                                     PipelineID pipelineID,
+                                     NavigableSet<ContainerID> containerIDs);
 
   /**
-   * Add a container Replica for given DataNode.
    *
-   * @param containerID
-   * @param replica
    */
-  void updateContainerReplica(final ContainerID containerID,
-      final ContainerReplica replica) throws ContainerNotFoundException {
-    containers.updateContainerReplica(containerID, replica);
-  }
+  @Replicate
+  void removeContainer(HddsProtos.ContainerID containerInfo)
+      throws IOException;
 
   /**
-   * Remove a container Replica for given DataNode.
-   *
-   * @param containerID
-   * @param replica
-   * @return True of dataNode is removed successfully else false.
+   * Reinitialize the ContainerStateManager with container store.
+   * @param containerStore container table.
+   * @throws IOException
    */
-  void removeContainerReplica(final ContainerID containerID,
-      final ContainerReplica replica)
-      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
-    containers.removeContainerReplica(containerID, replica);
-  }
-
-  void removeContainer(final ContainerID containerID)
-      throws ContainerNotFoundException {
-    if (containers.getContainerInfo(containerID) == null) {
-      throw new ContainerNotFoundException(containerID.toString());
-    }
-    containers.removeContainer(containerID);
-  }
+  void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
+      throws IOException;
 
   /**
-   * Update the lastUsedmap to update with ContainerState and containerID.
-   * @param pipelineID
-   * @param containerID
-   * @param owner
+   *
    */
-  public synchronized void updateLastUsedMap(PipelineID pipelineID,
-      ContainerID containerID, String owner) {
-    lastUsedMap.put(new ContainerState(owner, pipelineID),
-        containerID);
-  }
-
+  void close() throws IOException;
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 4777d49..b8577fd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -79,7 +79,7 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DE
  * This class is NOT thread safe. All the calls are idempotent.
  */
 public final class ContainerStateManagerImpl
-    implements ContainerStateManagerV2 {
+    implements ContainerStateManager {
 
   /**
    * Logger instance of ContainerStateManagerImpl.
@@ -561,21 +561,21 @@ public final class ContainerStateManagerImpl
       return this;
     }
 
-    public ContainerStateManagerV2 build() throws IOException {
+    public ContainerStateManager build() throws IOException {
       Preconditions.checkNotNull(conf);
       Preconditions.checkNotNull(pipelineMgr);
       Preconditions.checkNotNull(table);
 
-      final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
+      final ContainerStateManager csm = new ContainerStateManagerImpl(
           conf, pipelineMgr, table, transactionBuffer);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
               scmRatisServer);
 
-      return (ContainerStateManagerV2) Proxy.newProxyInstance(
+      return (ContainerStateManager) Proxy.newProxyInstance(
           SCMHAInvocationHandler.class.getClassLoader(),
-          new Class<?>[]{ContainerStateManagerV2.class}, invocationHandler);
+          new Class<?>[]{ContainerStateManager.class}, invocationHandler);
     }
 
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
deleted file mode 100644
index 276e21c..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.db.Table;
-import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-
-/**
- * A ContainerStateManager is responsible for keeping track of all the
- * container and its state inside SCM, it also exposes methods to read and
- * modify the container and its state.
- *
- * All the mutation operations are marked with {@link Replicate} annotation so
- * that when SCM-HA is enabled, the mutations are replicated from leader SCM
- * to the followers.
- *
- * When a method is marked with {@link Replicate} annotation it should follow
- * the below rules.
- *
- * 1. The method call should be Idempotent
- * 2. Arguments should be of protobuf objects
- * 3. Return type should be of protobuf object
- * 4. The declaration should throw RaftException
- *
- */
-public interface ContainerStateManagerV2 {
-
-  //TODO: Rename this to ContainerStateManager
-
-  /* **********************************************************************
-   * Container Life Cycle                                                 *
-   *                                                                      *
-   * Event and State Transition Mapping:                                  *
-   *                                                                      *
-   * State: OPEN         ----------------> CLOSING                        *
-   * Event:                    FINALIZE                                   *
-   *                                                                      *
-   * State: CLOSING      ----------------> QUASI_CLOSED                   *
-   * Event:                  QUASI_CLOSE                                  *
-   *                                                                      *
-   * State: CLOSING      ----------------> CLOSED                         *
-   * Event:                     CLOSE                                     *
-   *                                                                      *
-   * State: QUASI_CLOSED ----------------> CLOSED                         *
-   * Event:                  FORCE_CLOSE                                  *
-   *                                                                      *
-   * State: CLOSED       ----------------> DELETING                       *
-   * Event:                    DELETE                                     *
-   *                                                                      *
-   * State: DELETING     ----------------> DELETED                        *
-   * Event:                    CLEANUP                                    *
-   *                                                                      *
-   *                                                                      *
-   * Container State Flow:                                                *
-   *                                                                      *
-   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]        *
-   *          (FINALIZE)      |      (QUASI_CLOSE)        |               *
-   *                          |                           |               *
-   *                          |                           |               *
-   *                  (CLOSE) |             (FORCE_CLOSE) |               *
-   *                          |                           |               *
-   *                          |                           |               *
-   *                          +--------->[CLOSED]<--------+               *
-   *                                        |                             *
-   *                                (DELETE)|                             *
-   *                                        |                             *
-   *                                        |                             *
-   *                                   [DELETING]                         *
-   *                                        |                             *
-   *                              (CLEANUP) |                             *
-   *                                        |                             *
-   *                                        V                             *
-   *                                    [DELETED]                         *
-   *                                                                      *
-   ************************************************************************/
-
-  /**
-   *
-   */
-  boolean contains(HddsProtos.ContainerID containerID);
-
-  /**
-   * Returns the ID of all the managed containers.
-   *
-   * @return Set of {@link ContainerID}
-   */
-  Set<ContainerID> getContainerIDs();
-
-  /**
-   *
-   */
-  Set<ContainerID> getContainerIDs(LifeCycleState state);
-
-  /**
-   *
-   */
-  ContainerInfo getContainer(HddsProtos.ContainerID id);
-
-  /**
-   *
-   */
-  Set<ContainerReplica> getContainerReplicas(HddsProtos.ContainerID id);
-
-  /**
-   *
-   */
-  void updateContainerReplica(HddsProtos.ContainerID id,
-                              ContainerReplica replica);
-
-  /**
-   *
-   */
-  void removeContainerReplica(HddsProtos.ContainerID id,
-                              ContainerReplica replica);
-
-  /**
-   *
-   */
-  @Replicate
-  void addContainer(ContainerInfoProto containerInfo)
-      throws IOException;
-
-  /**
-   *
-   */
-  @Replicate
-  void updateContainerState(HddsProtos.ContainerID id,
-                            HddsProtos.LifeCycleEvent event)
-      throws IOException, InvalidStateTransitionException;
-
-  /**
-   *
-   */
-  // Make this as @Replicate
-  void updateDeleteTransactionId(Map<ContainerID, Long> deleteTransactionMap)
-      throws IOException;
-
-  /**
-   *
-   */
-  ContainerInfo getMatchingContainer(long size, String owner,
-                                     PipelineID pipelineID,
-                                     NavigableSet<ContainerID> containerIDs);
-
-  /**
-   *
-   */
-  @Replicate
-  void removeContainer(HddsProtos.ContainerID containerInfo)
-      throws IOException;
-
-  /**
-   * Reinitialize the ContainerStateManager with container store.
-   * @param containerStore container table.
-   * @throws IOException
-   */
-  void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
-      throws IOException;
-
-  /**
-   *
-   */
-  void close() throws IOException;
-}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 31acfcf..068f3d7 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -92,6 +92,7 @@ import java.util.concurrent.ThreadLocalRandom;
 public final class TestUtils {
 
   private static ThreadLocalRandom random = ThreadLocalRandom.current();
+  private static PipelineID randomPipelineID = PipelineID.randomId();
 
   private TestUtils() {
   }
@@ -621,7 +622,7 @@ public final class TestUtils {
     return StorageContainerManager.createSCM(conf, configurator);
   }
 
-  public static ContainerInfo getContainer(
+  private static ContainerInfo.Builder getDefaultContainerInfoBuilder(
       final HddsProtos.LifeCycleState state) {
     return new ContainerInfo.Builder()
         .setContainerID(RandomUtils.nextLong())
@@ -629,7 +630,20 @@ public final class TestUtils {
             new RatisReplicationConfig(ReplicationFactor.THREE))
         .setState(state)
         .setSequenceId(10000L)
-        .setOwner("TEST")
+        .setOwner("TEST");
+  }
+
+  public static ContainerInfo getContainer(
+      final HddsProtos.LifeCycleState state) {
+    return getDefaultContainerInfoBuilder(state)
+        .setPipelineID(randomPipelineID)
+        .build();
+  }
+
+  public static ContainerInfo getContainer(
+      final HddsProtos.LifeCycleState state, PipelineID pipelineID) {
+    return getDefaultContainerInfoBuilder(state)
+        .setPipelineID(pipelineID)
         .build();
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 2d92992..cbaf3bf 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -16,7 +16,9 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,24 +27,35 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -59,28 +72,49 @@ public class TestContainerReportHandler {
   private ContainerManager containerManager;
   private ContainerStateManager containerStateManager;
   private EventPublisher publisher;
+  private File testDir;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
+  private PipelineManager pipelineManager;
 
   @Before
   public void setup() throws IOException, InvalidStateTransitionException {
-    final ConfigurationSource conf = new OzoneConfiguration();
-    this.nodeManager = new MockNodeManager(true, 10);
-    this.containerManager = Mockito.mock(ContainerManager.class);
-    this.containerStateManager = new ContainerStateManager(conf);
-    this.publisher = Mockito.mock(EventPublisher.class);
-
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
+    nodeManager = new MockNodeManager(true, 10);
+    containerManager = Mockito.mock(ContainerManager.class);
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    nodeManager = new MockNodeManager(true, 10);
+    pipelineManager =
+        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+    containerStateManager = ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+    publisher = Mockito.mock(EventPublisher.class);
 
     Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainer((ContainerID)invocation.getArguments()[0]));
+            .getContainer(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     Mockito.when(containerManager.getContainerReplicas(
         Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+            .getContainerReplicas(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     Mockito.doAnswer(invocation -> {
       containerStateManager
-          .updateContainerState((ContainerID)invocation.getArguments()[0],
+          .updateContainerState(((ContainerID)invocation
+                  .getArguments()[0]).getProtobuf(),
               (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
       return null;
     }).when(containerManager).updateContainerState(
@@ -89,7 +123,7 @@ public class TestContainerReportHandler {
 
     Mockito.doAnswer(invocation -> {
       containerStateManager.updateContainerReplica(
-          (ContainerID) invocation.getArguments()[0],
+          ((ContainerID)invocation.getArguments()[0]).getProtobuf(),
           (ContainerReplica) invocation.getArguments()[1]);
       return null;
     }).when(containerManager).updateContainerReplica(
@@ -97,7 +131,7 @@ public class TestContainerReportHandler {
 
     Mockito.doAnswer(invocation -> {
       containerStateManager.removeContainerReplica(
-          (ContainerID) invocation.getArguments()[0],
+          ((ContainerID)invocation.getArguments()[0]).getProtobuf(),
           (ContainerReplica) invocation.getArguments()[1]);
       return null;
     }).when(containerManager).removeContainerReplica(
@@ -106,14 +140,18 @@ public class TestContainerReportHandler {
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     containerStateManager.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
   @Test
   public void testUnderReplicatedContainer()
-      throws NodeNotFoundException, ContainerNotFoundException, SCMException {
-
+      throws NodeNotFoundException, IOException {
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
@@ -132,32 +170,20 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    containerStateManager.loadContainer(containerOne);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
     getReplicas(containerOne.containerID(),
         ContainerReplicaProto.State.CLOSED,
         datanodeOne, datanodeTwo, datanodeThree)
-        .forEach(r -> {
-          try {
-            containerStateManager.updateContainerReplica(
-                containerOne.containerID(), r);
-          } catch (ContainerNotFoundException ignored) {
-
-          }
-        });
+        .forEach(r -> containerStateManager.updateContainerReplica(
+            containerOne.containerID().getProtobuf(), r));
 
     getReplicas(containerTwo.containerID(),
         ContainerReplicaProto.State.CLOSED,
         datanodeOne, datanodeTwo, datanodeThree)
-        .forEach(r -> {
-          try {
-            containerStateManager.updateContainerReplica(
-                containerTwo.containerID(), r);
-          } catch (ContainerNotFoundException ignored) {
-
-          }
-        });
+        .forEach(r -> containerStateManager.updateContainerReplica(
+            containerTwo.containerID().getProtobuf(), r));
 
 
     // SCM expects both containerOne and containerTwo to be in all the three
@@ -180,7 +206,7 @@ public class TestContainerReportHandler {
 
   @Test
   public void testOverReplicatedContainer() throws NodeNotFoundException,
-      SCMException, ContainerNotFoundException {
+      IOException {
 
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
@@ -203,32 +229,20 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    containerStateManager.loadContainer(containerOne);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
     getReplicas(containerOne.containerID(),
         ContainerReplicaProto.State.CLOSED,
         datanodeOne, datanodeTwo, datanodeThree)
-        .forEach(r -> {
-          try {
-            containerStateManager.updateContainerReplica(
-                containerOne.containerID(), r);
-          } catch (ContainerNotFoundException ignored) {
-
-          }
-        });
+        .forEach(r -> containerStateManager.updateContainerReplica(
+            containerOne.containerID().getProtobuf(), r));
 
     getReplicas(containerTwo.containerID(),
         ContainerReplicaProto.State.CLOSED,
         datanodeOne, datanodeTwo, datanodeThree)
-        .forEach(r -> {
-          try {
-            containerStateManager.updateContainerReplica(
-                containerTwo.containerID(), r);
-          } catch (ContainerNotFoundException ignored) {
-
-          }
-        });
+        .forEach(r -> containerStateManager.updateContainerReplica(
+            containerTwo.containerID().getProtobuf(), r));
 
 
     // SCM expects both containerOne and containerTwo to be in all the three
@@ -296,26 +310,16 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    containerStateManager.loadContainer(containerOne);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
-    containerOneReplicas.forEach(r -> {
-      try {
+    containerOneReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+        containerTwo.containerID().getProtobuf(), r));
 
-    containerTwoReplicas.forEach(r -> {
-      try {
+    containerTwoReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+        containerTwo.containerID().getProtobuf(), r));
 
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -325,7 +329,8 @@ public class TestContainerReportHandler {
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED,
+        containerManager.getContainer(containerOne.containerID()).getState());
   }
 
   @Test
@@ -373,26 +378,16 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    containerStateManager.loadContainer(containerOne);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
-    containerOneReplicas.forEach(r -> {
-      try {
+    containerOneReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+        containerTwo.containerID().getProtobuf(), r));
 
-    containerTwoReplicas.forEach(r -> {
-      try {
+    containerTwoReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+        containerTwo.containerID().getProtobuf(), r));
 
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -402,7 +397,8 @@ public class TestContainerReportHandler {
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState());
+    Assert.assertEquals(LifeCycleState.QUASI_CLOSED,
+        containerManager.getContainer(containerOne.containerID()).getState());
   }
 
   @Test
@@ -453,26 +449,16 @@ public class TestContainerReportHandler {
     nodeManager.setContainers(datanodeTwo, containerIDSet);
     nodeManager.setContainers(datanodeThree, containerIDSet);
 
-    containerStateManager.loadContainer(containerOne);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(containerOne.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
-    containerOneReplicas.forEach(r -> {
-      try {
+    containerOneReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
+        containerTwo.containerID().getProtobuf(), r));
 
-      }
-    });
-
-    containerTwoReplicas.forEach(r -> {
-      try {
+    containerTwoReplicas.forEach(r ->
         containerStateManager.updateContainerReplica(
-            containerTwo.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+        containerTwo.containerID().getProtobuf(), r));
 
 
     final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -483,38 +469,47 @@ public class TestContainerReportHandler {
         new ContainerReportFromDatanode(datanodeOne, containerReport);
     reportHandler.onMessage(containerReportFromDatanode, publisher);
 
-    Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED,
+        containerManager.getContainer(containerOne.containerID()).getState());
   }
 
   @Test
   public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
-      throws SCMException {
+      throws IOException {
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
         NodeStatus.inServiceHealthy()).iterator();
 
+    Pipeline pipeline = pipelineManager.createPipeline(
+        new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
+
     final DatanodeDetails datanodeOne = nodeIterator.next();
     final DatanodeDetails datanodeTwo = nodeIterator.next();
     final DatanodeDetails datanodeThree = nodeIterator.next();
 
     final ContainerReplicaProto.State replicaState
         = ContainerReplicaProto.State.OPEN;
-    final ContainerInfo containerOne = getContainer(LifeCycleState.OPEN);
+    final ContainerInfo containerOne =
+        getContainer(LifeCycleState.OPEN, pipeline.getId());
 
-    containerStateManager.loadContainer(containerOne);
+    containerStateManager.addContainer(containerOne.getProtobuf());
     // Container loaded, no replicas reported from DNs. Expect zeros for
     // usage values.
-    assertEquals(0L, containerOne.getUsedBytes());
-    assertEquals(0L, containerOne.getNumberOfKeys());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+            .getUsedBytes());
+    assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+            .getNumberOfKeys());
 
     reportHandler.onMessage(getContainerReportFromDatanode(
         containerOne.containerID(), replicaState,
         datanodeOne, 50L, 60L), publisher);
 
     // Single replica reported - ensure values are updated
-    assertEquals(50L, containerOne.getUsedBytes());
-    assertEquals(60L, containerOne.getNumberOfKeys());
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     reportHandler.onMessage(getContainerReportFromDatanode(
         containerOne.containerID(), replicaState,
@@ -524,8 +519,10 @@ public class TestContainerReportHandler {
         datanodeThree, 50L, 60L), publisher);
 
     // All 3 DNs are reporting the same values. Counts should be as expected.
-    assertEquals(50L, containerOne.getUsedBytes());
-    assertEquals(60L, containerOne.getNumberOfKeys());
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     // Now each DN reports a different lesser value. Counts should be the min
     // reported.
@@ -541,8 +538,10 @@ public class TestContainerReportHandler {
 
     // All 3 DNs are reporting different values. The actual value should be the
     // minimum.
-    assertEquals(1L, containerOne.getUsedBytes());
-    assertEquals(10L, containerOne.getNumberOfKeys());
+    assertEquals(1L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(10L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     // Have the lowest value report a higher value and ensure the new value
     // is the minimum
@@ -550,13 +549,15 @@ public class TestContainerReportHandler {
         containerOne.containerID(), replicaState,
         datanodeOne, 3L, 12L), publisher);
 
-    assertEquals(2L, containerOne.getUsedBytes());
-    assertEquals(11L, containerOne.getNumberOfKeys());
+    assertEquals(2L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(11L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
   }
 
   @Test
   public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
-      throws SCMException {
+      throws IOException {
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
     final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
@@ -570,7 +571,7 @@ public class TestContainerReportHandler {
         = ContainerReplicaProto.State.CLOSED;
     final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
 
-    containerStateManager.loadContainer(containerOne);
+    containerStateManager.addContainer(containerOne.getProtobuf());
     // Container loaded, no replicas reported from DNs. Expect zeros for
     // usage values.
     assertEquals(0L, containerOne.getUsedBytes());
@@ -581,8 +582,10 @@ public class TestContainerReportHandler {
         datanodeOne, 50L, 60L), publisher);
 
     // Single replica reported - ensure values are updated
-    assertEquals(50L, containerOne.getUsedBytes());
-    assertEquals(60L, containerOne.getNumberOfKeys());
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     reportHandler.onMessage(getContainerReportFromDatanode(
         containerOne.containerID(), replicaState,
@@ -592,8 +595,10 @@ public class TestContainerReportHandler {
         datanodeThree, 50L, 60L), publisher);
 
     // All 3 DNs are reporting the same values. Counts should be as expected.
-    assertEquals(50L, containerOne.getUsedBytes());
-    assertEquals(60L, containerOne.getNumberOfKeys());
+    assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     // Now each DN reports a different lesser value. Counts should be the max
     // reported.
@@ -609,8 +614,10 @@ public class TestContainerReportHandler {
 
     // All 3 DNs are reporting different values. The actual value should be the
     // maximum.
-    assertEquals(3L, containerOne.getUsedBytes());
-    assertEquals(12L, containerOne.getNumberOfKeys());
+    assertEquals(3L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(12L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
 
     // Have the highest value report a lower value and ensure the new value
     // is the new maximumu
@@ -618,13 +625,15 @@ public class TestContainerReportHandler {
         containerOne.containerID(), replicaState,
         datanodeThree, 1L, 10L), publisher);
 
-    assertEquals(2L, containerOne.getUsedBytes());
-    assertEquals(11L, containerOne.getNumberOfKeys());
+    assertEquals(2L, containerManager.getContainer(containerOne.containerID())
+        .getUsedBytes());
+    assertEquals(11L, containerManager.getContainer(containerOne.containerID())
+        .getNumberOfKeys());
   }
 
   @Test
   public void testStaleReplicaOfDeletedContainer() throws 
NodeNotFoundException,
-      SCMException, ContainerNotFoundException {
+      IOException {
 
     final ContainerReportHandler reportHandler = new ContainerReportHandler(
         nodeManager, containerManager);
@@ -639,7 +648,7 @@ public class TestContainerReportHandler {
         .collect(Collectors.toSet());
 
     nodeManager.setContainers(datanodeOne, containerIDSet);
-    containerStateManager.loadContainer(containerOne);
+    containerStateManager.addContainer(containerOne.getProtobuf());
 
     // Expects the replica will be deleted.
     final ContainerReportsProto containerReport = getContainerReportsProto(
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 11c18a4..1bd07f7 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -17,21 +17,34 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Set;
+import java.util.UUID;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,12 +58,51 @@ import static org.mockito.Mockito.when;
 public class TestContainerStateManager {
 
   private ContainerStateManager containerStateManager;
+  private PipelineManager pipelineManager;
+  private SCMHAManager scmhaManager;
+  private File testDir;
+  private DBStore dbStore;
+  private Pipeline pipeline;
 
   @Before
   public void init() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
-    containerStateManager = new ContainerStateManager(conf);
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    pipelineManager = Mockito.mock(PipelineManager.class);
+    pipeline = Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED)
+            .setId(PipelineID.randomId())
+            .setReplicationConfig(new StandaloneReplicationConfig(
+                ReplicationFactor.THREE))
+            .setNodes(new ArrayList<>()).build();
+    when(pipelineManager.createPipeline(new StandaloneReplicationConfig(
+        ReplicationFactor.THREE))).thenReturn(pipeline);
+    when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
+        .thenReturn(true);
+
+
+    containerStateManager = ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
+  }
 
+  @After
+  public void tearDown() throws Exception {
+    containerStateManager.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
   @Test
@@ -68,7 +120,7 @@ public class TestContainerStateManager {
 
     //WHEN
     Set<ContainerReplica> replicas = containerStateManager
-        .getContainerReplicas(c1.containerID());
+        .getContainerReplicas(c1.containerID().getProtobuf());
 
     //THEN
     Assert.assertEquals(3, replicas.size());
@@ -88,41 +140,38 @@ public class TestContainerStateManager {
 
     //WHEN
     Set<ContainerReplica> replicas = containerStateManager
-        .getContainerReplicas(c1.containerID());
+        .getContainerReplicas(c1.containerID().getProtobuf());
 
     Assert.assertEquals(2, replicas.size());
     Assert.assertEquals(3, c1.getReplicationConfig().getRequiredNodes());
   }
 
-  private void addReplica(ContainerInfo cont, DatanodeDetails node)
-      throws ContainerNotFoundException {
+  private void addReplica(ContainerInfo cont, DatanodeDetails node) {
     ContainerReplica replica = ContainerReplica.newBuilder()
         .setContainerID(cont.containerID())
         .setContainerState(ContainerReplicaProto.State.CLOSED)
         .setDatanodeDetails(node)
         .build();
     containerStateManager
-        .updateContainerReplica(cont.containerID(), replica);
+        .updateContainerReplica(cont.containerID().getProtobuf(), replica);
   }
 
   private ContainerInfo allocateContainer() throws IOException {
 
-    PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-
-    Pipeline pipeline =
-        Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED)
-            .setId(PipelineID.randomId())
-            .setReplicationConfig(new StandaloneReplicationConfig(
-                ReplicationFactor.THREE))
-            .setNodes(new ArrayList<>()).build();
-
-    when(pipelineManager.createPipeline(new StandaloneReplicationConfig(
-        ReplicationFactor.THREE))).thenReturn(pipeline);
-
-    return containerStateManager.allocateContainer(pipelineManager,
-        new StandaloneReplicationConfig(
-            ReplicationFactor.THREE), "root");
+    final ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setState(HddsProtos.LifeCycleState.OPEN)
+        .setPipelineID(pipeline.getId())
+        .setUsedBytes(0)
+        .setNumberOfKeys(0)
+        .setStateEnterTime(Time.now())
+        .setOwner("root")
+        .setContainerID(1)
+        .setDeleteTransactionId(0)
+        .setReplicationConfig(pipeline.getReplicationConfig())
+        .build();
 
+    containerStateManager.addContainer(containerInfo.getProtobuf());
+    return containerInfo;
   }
 
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 0aa8e5f..e8d4b38 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,12 +28,17 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .IncrementalContainerReportFromDatanode;
@@ -40,6 +46,8 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
@@ -48,6 +56,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -75,6 +84,10 @@ public class TestIncrementalContainerReportHandler {
   private EventPublisher publisher;
   private HDDSLayoutVersionManager versionManager;
   private SCMContext scmContext = SCMContext.emptyContext();
+  private PipelineManager pipelineManager;
+  private File testDir;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
 
   @Before
   public void setup() throws IOException, InvalidStateTransitionException {
@@ -96,22 +109,40 @@ public class TestIncrementalContainerReportHandler {
     this.nodeManager =
         new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap,
             scmContext, versionManager);
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+
+    pipelineManager =
+        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+
+    this.containerStateManager = ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
 
-    this.containerStateManager = new ContainerStateManager(conf);
     this.publisher = Mockito.mock(EventPublisher.class);
 
     Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainer((ContainerID)invocation.getArguments()[0]));
+            .getContainer(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     Mockito.when(containerManager.getContainerReplicas(
         Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+            .getContainerReplicas(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     Mockito.doAnswer(invocation -> {
       containerStateManager
-          .removeContainerReplica((ContainerID)invocation.getArguments()[0],
+          .removeContainerReplica(((ContainerID)invocation
+                  .getArguments()[0]).getProtobuf(),
               (ContainerReplica)invocation.getArguments()[1]);
       return null;
     }).when(containerManager).removeContainerReplica(
@@ -120,7 +151,8 @@ public class TestIncrementalContainerReportHandler {
 
     Mockito.doAnswer(invocation -> {
       containerStateManager
-          .updateContainerState((ContainerID)invocation.getArguments()[0],
+          .updateContainerState(((ContainerID)invocation
+                  .getArguments()[0]).getProtobuf(),
               (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
       return null;
     }).when(containerManager).updateContainerState(
@@ -129,7 +161,8 @@ public class TestIncrementalContainerReportHandler {
 
     Mockito.doAnswer(invocation -> {
       containerStateManager
-          .updateContainerReplica((ContainerID)invocation.getArguments()[0],
+          .updateContainerReplica(((ContainerID)invocation
+                  .getArguments()[0]).getProtobuf(),
               (ContainerReplica) invocation.getArguments()[1]);
       return null;
     }).when(containerManager).updateContainerReplica(
@@ -139,8 +172,13 @@ public class TestIncrementalContainerReportHandler {
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     containerStateManager.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
 
@@ -161,15 +199,9 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.CLOSING,
         datanodeOne, datanodeTwo, datanodeThree);
 
-    containerStateManager.loadContainer(container);
-    containerReplicas.forEach(r -> {
-      try {
-        containerStateManager.updateContainerReplica(
-            container.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+    containerStateManager.addContainer(container.getProtobuf());
+    containerReplicas.forEach(r -> 
containerStateManager.updateContainerReplica(
+        container.containerID().getProtobuf(), r));
 
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
@@ -179,7 +211,8 @@ public class TestIncrementalContainerReportHandler {
         new IncrementalContainerReportFromDatanode(
             datanodeOne, containerReport);
     reportHandler.onMessage(icrFromDatanode, publisher);
-    Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED,
+        containerManager.getContainer(container.containerID()).getState());
   }
 
   @Test
@@ -199,15 +232,9 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.CLOSING,
         datanodeOne, datanodeTwo, datanodeThree);
 
-    containerStateManager.loadContainer(container);
-    containerReplicas.forEach(r -> {
-      try {
-        containerStateManager.updateContainerReplica(
-            container.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+    containerStateManager.addContainer(container.getProtobuf());
+    containerReplicas.forEach(r -> 
containerStateManager.updateContainerReplica(
+        container.containerID().getProtobuf(), r));
 
 
     final IncrementalContainerReportProto containerReport =
@@ -218,7 +245,8 @@ public class TestIncrementalContainerReportHandler {
         new IncrementalContainerReportFromDatanode(
             datanodeOne, containerReport);
     reportHandler.onMessage(icrFromDatanode, publisher);
-    Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState());
+    Assert.assertEquals(LifeCycleState.QUASI_CLOSED,
+        containerManager.getContainer(container.containerID()).getState());
   }
 
   @Test
@@ -242,15 +270,9 @@ public class TestIncrementalContainerReportHandler {
         ContainerReplicaProto.State.QUASI_CLOSED,
         datanodeThree));
 
-    containerStateManager.loadContainer(container);
-    containerReplicas.forEach(r -> {
-      try {
-        containerStateManager.updateContainerReplica(
-            container.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+    containerStateManager.addContainer(container.getProtobuf());
+    containerReplicas.forEach(r -> 
containerStateManager.updateContainerReplica(
+        container.containerID().getProtobuf(), r));
 
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
@@ -260,7 +282,8 @@ public class TestIncrementalContainerReportHandler {
         new IncrementalContainerReportFromDatanode(
             datanodeOne, containerReport);
     reportHandler.onMessage(icr, publisher);
-    Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
+    Assert.assertEquals(LifeCycleState.CLOSED,
+        containerManager.getContainer(container.containerID()).getState());
   }
 
   @Test
@@ -280,17 +303,11 @@ public class TestIncrementalContainerReportHandler {
         CLOSED,
         datanodeOne, datanodeTwo, datanodeThree);
 
-    containerStateManager.loadContainer(container);
-    containerReplicas.forEach(r -> {
-      try {
-        containerStateManager.updateContainerReplica(
-            container.containerID(), r);
-      } catch (ContainerNotFoundException ignored) {
-
-      }
-    });
+    containerStateManager.addContainer(container.getProtobuf());
+    containerReplicas.forEach(r -> 
containerStateManager.updateContainerReplica(
+        container.containerID().getProtobuf(), r));
     Assert.assertEquals(3, containerStateManager
-        .getContainerReplicas(container.containerID()).size());
+        .getContainerReplicas(container.containerID().getProtobuf()).size());
     final IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(container.containerID(),
             ContainerReplicaProto.State.DELETED,
@@ -300,7 +317,7 @@ public class TestIncrementalContainerReportHandler {
             datanodeOne, containerReport);
     reportHandler.onMessage(icr, publisher);
     Assert.assertEquals(2, containerStateManager
-        .getContainerReplicas(container.containerID()).size());
+        .getContainerReplicas(container.containerID().getProtobuf()).size());
   }
 
   @Test
@@ -319,8 +336,8 @@ public class TestIncrementalContainerReportHandler {
     final DatanodeDetails datanode = randomDatanodeDetails();
     nodeManager.register(datanode, null, null);
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.loadContainer(containerTwo);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.addContainer(containerTwo.getProtobuf());
 
     Assert.assertEquals(0, nodeManager.getContainers(datanode).size());
 
@@ -359,7 +376,8 @@ public class TestIncrementalContainerReportHandler {
           // If we find "container" in the NM, then we must also have it in
           // Container Manager.
           Assert.assertEquals(1, containerStateManager
-              .getContainerReplicas(container.containerID()).size());
+              .getContainerReplicas(container.containerID().getProtobuf())
+              .size());
           Assert.assertEquals(2, nmContainers.size());
         } else {
           // If the race condition occurs as mentioned in HDDS-5249, then this
@@ -367,11 +385,13 @@ public class TestIncrementalContainerReportHandler {
           // NM, but have found something for it in ContainerManager, and that
           // should not happen. It should be in both, or neither.
           Assert.assertEquals(0, containerStateManager
-              .getContainerReplicas(container.containerID()).size());
+              .getContainerReplicas(container.containerID().getProtobuf())
+              .size());
           Assert.assertEquals(1, nmContainers.size());
         }
         Assert.assertEquals(1, containerStateManager
-            .getContainerReplicas(containerTwo.containerID()).size());
+            .getContainerReplicas(containerTwo.containerID().getProtobuf())
+            .size());
       }
     } finally {
       executor.shutdown();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index b4dc1fe..1fd06b6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.primitives.Longs;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
 import org.apache.hadoop.hdds.protocol.proto
@@ -36,13 +38,14 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.TestClock;
@@ -90,6 +94,7 @@ import static org.apache.hadoop.hdds.protocol.proto
 import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
 import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.mockito.Mockito.when;
 
 /**
  * Test cases to verify the functionality of ReplicationManager.
@@ -108,10 +113,13 @@ public class TestReplicationManager {
   private TestClock clock;
   private File testDir;
   private DBStore dbStore;
+  private PipelineManager pipelineManager;
+  private SCMHAManager scmhaManager;
 
   @Before
   public void setup()
-      throws IOException, InterruptedException, NodeNotFoundException {
+      throws IOException, InterruptedException,
+      NodeNotFoundException, InvalidStateTransitionException {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setTimeDuration(
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
@@ -121,7 +129,22 @@ public class TestReplicationManager {
     containerManager = Mockito.mock(ContainerManager.class);
     nodeManager = new SimpleMockNodeManager();
     eventQueue = new EventQueue();
-    containerStateManager = new ContainerStateManager(conf);
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    pipelineManager = Mockito.mock(PipelineManager.class);
+    when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
+        .thenReturn(true);
+    containerStateManager = ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
     serviceManager = new SCMServiceManager();
 
     datanodeCommandHandler = new DatanodeCommandHandler();
@@ -129,29 +152,31 @@ public class TestReplicationManager {
 
     Mockito.when(containerManager.getContainers())
         .thenAnswer(invocation -> {
-          Set<ContainerID> ids = containerStateManager.getAllContainerIDs();
+          Set<ContainerID> ids = containerStateManager.getContainerIDs();
           List<ContainerInfo> containers = new ArrayList<>();
           for (ContainerID id : ids) {
-            containers.add(containerStateManager.getContainer(id));
+            containers.add(containerStateManager.getContainer(
+                id.getProtobuf()));
           }
           return containers;
         });
 
     Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainer((ContainerID)invocation.getArguments()[0]));
+            .getContainer(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     Mockito.when(containerManager.getContainerReplicas(
         Mockito.any(ContainerID.class)))
         .thenAnswer(invocation -> containerStateManager
-            .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+            .getContainerReplicas(((ContainerID)invocation
+                .getArguments()[0]).getProtobuf()));
 
     containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
 
     Mockito.when(containerPlacementPolicy.chooseDatanodes(
-        Mockito.any(),
-        Mockito.any(),
-        Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong()))
+        Mockito.any(), Mockito.any(), Mockito.anyInt(),
+            Mockito.anyLong(), Mockito.anyLong()))
         .thenAnswer(invocation -> {
           int count = (int) invocation.getArguments()[2];
           return IntStream.range(0, count)
@@ -202,6 +227,15 @@ public class TestReplicationManager {
     Thread.sleep(100L);
   }
 
+  @After
+  public void tearDown() throws Exception {
+    containerStateManager.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
+  }
 
   /**
    * Checks if restarting of replication manager works.
@@ -224,9 +258,9 @@ public class TestReplicationManager {
    * any action on OPEN containers.
    */
   @Test
-  public void testOpenContainer() throws SCMException, InterruptedException {
+  public void testOpenContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.OPEN);
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     replicationManager.processAll();
     eventQueue.processAll(1000);
     Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
@@ -238,12 +272,11 @@ public class TestReplicationManager {
    * to all the datanodes.
    */
   @Test
-  public void testClosingContainer() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testClosingContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final ContainerID id = container.containerID();
 
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
 
     // Two replicas in CLOSING state
     final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSING,
@@ -255,7 +288,7 @@ public class TestReplicationManager {
     replicas.addAll(getReplicas(id, State.OPEN, datanode));
 
     for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     final int currentCloseCommandCount = datanodeCommandHandler
@@ -268,7 +301,7 @@ public class TestReplicationManager {
 
     // Update the OPEN to CLOSING
     for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     replicationManager.processAll();
@@ -284,8 +317,7 @@ public class TestReplicationManager {
    * datanodes.
    */
   @Test
-  public void testQuasiClosedContainerWithTwoOpenReplica() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -297,10 +329,11 @@ public class TestReplicationManager {
     final ContainerReplica replicaThree = getReplicas(
         id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
 
     final int currentCloseCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
@@ -323,8 +356,7 @@ public class TestReplicationManager {
    * the container, ReplicationManager will not do anything.
    */
   @Test
-  public void testHealthyQuasiClosedContainer() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testHealthyQuasiClosedContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -335,10 +367,11 @@ public class TestReplicationManager {
     final ContainerReplica replicaThree = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
 
     // All the QUASI_CLOSED replicas have same originNodeId, so the
     // container will not be closed. ReplicationManager should take no action.
@@ -358,8 +391,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testQuasiClosedContainerWithUnhealthyReplica()
-      throws SCMException, ContainerNotFoundException, InterruptedException,
-      ContainerReplicaNotFoundException {
+      throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     container.setUsedBytes(100);
     final ContainerID id = container.containerID();
@@ -371,10 +403,11 @@ public class TestReplicationManager {
     final ContainerReplica replicaThree = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
 
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -391,7 +424,8 @@ public class TestReplicationManager {
     final ContainerReplica unhealthyReplica = getReplicas(
         id, State.UNHEALTHY, 1000L, originNodeId,
         replicaOne.getDatanodeDetails());
-    containerStateManager.updateContainerReplica(id, unhealthyReplica);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), unhealthyReplica);
 
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -404,7 +438,7 @@ public class TestReplicationManager {
         replicationManager.getMetrics().getNumDeletionCmdsSent());
 
     // Now we will delete the unhealthy replica from in-memory.
-    containerStateManager.removeContainerReplica(id, replicaOne);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), replicaOne);
 
     final long currentBytesToReplicate = replicationManager.getMetrics()
         .getNumReplicationBytesTotal();
@@ -430,7 +464,8 @@ public class TestReplicationManager {
         .get(id).get(0).getDatanode();
     final ContainerReplica replicatedReplicaOne = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, targetDn);
-    containerStateManager.updateContainerReplica(id, replicatedReplicaOne);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicatedReplicaOne);
 
     final long currentReplicationCommandCompleted = replicationManager
         .getMetrics().getNumReplicationCmdsCompleted();
@@ -454,8 +489,7 @@ public class TestReplicationManager {
    * deletes the excess replicas.
    */
   @Test
-  public void testOverReplicatedQuasiClosedContainer() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testOverReplicatedQuasiClosedContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -468,11 +502,12 @@ public class TestReplicationManager {
     final ContainerReplica replicaFour = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFour);
 
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -491,13 +526,17 @@ public class TestReplicationManager {
     DatanodeDetails targetDn = replicationManager.getInflightDeletion()
         .get(id).get(0).getDatanode();
     if (targetDn.equals(replicaOne.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(id, replicaOne);
+      containerStateManager.removeContainerReplica(
+          id.getProtobuf(), replicaOne);
     } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(id, replicaTwo);
+      containerStateManager.removeContainerReplica(
+          id.getProtobuf(), replicaTwo);
     } else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(id, replicaThree);
+      containerStateManager.removeContainerReplica(
+          id.getProtobuf(), replicaThree);
     } else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
-      containerStateManager.removeContainerReplica(id, replicaFour);
+      containerStateManager.removeContainerReplica(
+          id.getProtobuf(), replicaFour);
     }
 
     final long currentDeleteCommandCompleted = replicationManager.getMetrics()
@@ -520,7 +559,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+      throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -533,11 +572,12 @@ public class TestReplicationManager {
     final ContainerReplica replicaFour = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFour);
 
     final int currentDeleteCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -558,7 +598,7 @@ public class TestReplicationManager {
     final long currentDeleteCommandCompleted = replicationManager.getMetrics()
         .getNumDeletionCmdsCompleted();
     // Now we remove the replica to simulate deletion complete
-    containerStateManager.removeContainerReplica(id, replicaOne);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), replicaOne);
 
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -575,8 +615,7 @@ public class TestReplicationManager {
    * under replicated.
    */
   @Test
-  public void testUnderReplicatedQuasiClosedContainer() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testUnderReplicatedQuasiClosedContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     container.setUsedBytes(100);
     final ContainerID id = container.containerID();
@@ -586,9 +625,9 @@ public class TestReplicationManager {
     final ContainerReplica replicaTwo = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
 
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
@@ -618,7 +657,8 @@ public class TestReplicationManager {
         .get(id).get(0).getDatanode();
     final ContainerReplica replicatedReplicaThree = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, targetDn);
-    containerStateManager.updateContainerReplica(id, replicatedReplicaThree);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicatedReplicaThree);
 
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -653,8 +693,8 @@ public class TestReplicationManager {
    */
   @Test
   public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
-      throws SCMException, ContainerNotFoundException, InterruptedException,
-      ContainerReplicaNotFoundException, TimeoutException {
+      throws IOException, InterruptedException,
+      TimeoutException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -663,9 +703,9 @@ public class TestReplicationManager {
     final ContainerReplica replicaTwo = getReplicas(
         id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
 
     final int currentReplicateCommandCount = datanodeCommandHandler
         .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
@@ -690,7 +730,7 @@ public class TestReplicationManager {
         replicateCommand.get().getDatanodeId());
     ContainerReplica newReplica = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
-    containerStateManager.updateContainerReplica(id, newReplica);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), newReplica);
 
     /*
      * We have report the replica to SCM, in the next ReplicationManager
@@ -711,7 +751,7 @@ public class TestReplicationManager {
     Assert.assertEquals(1, replicationManager.getMetrics()
         .getInflightDeletion());
 
-    containerStateManager.removeContainerReplica(id, replicaTwo);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), replicaTwo);
 
     final long currentDeleteCommandCompleted = replicationManager.getMetrics()
         .getNumDeletionCmdsCompleted();
@@ -748,17 +788,16 @@ public class TestReplicationManager {
    * highest BCSID.
    */
   @Test
-  public void testQuasiClosedToClosed() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testQuasiClosedToClosed() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
     final ContainerID id = container.containerID();
     final Set<ContainerReplica> replicas = getReplicas(id, State.QUASI_CLOSED,
         randomDatanodeDetails(),
         randomDatanodeDetails(),
         randomDatanodeDetails());
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     final int currentCloseCommandCount = datanodeCommandHandler
@@ -779,8 +818,7 @@ public class TestReplicationManager {
    * CLOSED and healthy.
    */
   @Test
-  public void testHealthyClosedContainer()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+  public void testHealthyClosedContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
     final Set<ContainerReplica> replicas = getReplicas(id, State.CLOSED,
@@ -788,9 +826,9 @@ public class TestReplicationManager {
         randomDatanodeDetails(),
         randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     replicationManager.processAll();
@@ -802,8 +840,7 @@ public class TestReplicationManager {
    * ReplicationManager should close the unhealthy OPEN container.
    */
   @Test
-  public void testUnhealthyOpenContainer()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+  public void testUnhealthyOpenContainer() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.OPEN);
     final ContainerID id = container.containerID();
     final Set<ContainerReplica> replicas = getReplicas(id, State.OPEN,
@@ -811,9 +848,9 @@ public class TestReplicationManager {
         randomDatanodeDetails());
     replicas.addAll(getReplicas(id, State.UNHEALTHY, randomDatanodeDetails()));
 
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     final CloseContainerEventHandler closeContainerHandler =
@@ -830,8 +867,7 @@ public class TestReplicationManager {
    * ReplicationManager should skip send close command to unhealthy replica.
    */
   @Test
-  public void testCloseUnhealthyReplica()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+  public void testCloseUnhealthyReplica() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
     final ContainerID id = container.containerID();
     final Set<ContainerReplica> replicas = getReplicas(id, State.UNHEALTHY,
@@ -839,9 +875,9 @@ public class TestReplicationManager {
     replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
     replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
 
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     for (ContainerReplica replica : replicas) {
-      containerStateManager.updateContainerReplica(id, replica);
+      containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
     }
 
     replicationManager.processAll();
@@ -863,8 +899,7 @@ public class TestReplicationManager {
   }
 
   @Test
-  public void additionalReplicaScheduledWhenMisReplicated()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+  public void additionalReplicaScheduledWhenMisReplicated() throws IOException 
{
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     container.setUsedBytes(100);
     final ContainerID id = container.containerID();
@@ -876,10 +911,11 @@ public class TestReplicationManager {
     final ContainerReplica replicaThree = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
 
     // Ensure a mis-replicated status is returned for any containers in this
     // test where there are 3 replicas. When there are 2 or 4 replicas
@@ -939,8 +975,7 @@ public class TestReplicationManager {
   }
 
   @Test
-  public void overReplicatedButRemovingMakesMisReplicated()
-      throws SCMException, ContainerNotFoundException, InterruptedException {
+  public void overReplicatedButRemovingMakesMisReplicated() throws IOException 
{
     // In this test, the excess replica should not be removed.
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
@@ -956,12 +991,13 @@ public class TestReplicationManager {
     final ContainerReplica replicaFive = getReplicas(
         id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-    containerStateManager.updateContainerReplica(id, replicaFive);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFour);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFive);
 
     // Ensure a mis-replicated status is returned for any containers in this
     // test where there are exactly 3 replicas checked.
@@ -993,8 +1029,7 @@ public class TestReplicationManager {
   }
 
   @Test
-  public void testOverReplicatedAndPolicySatisfied() throws
-      SCMException, ContainerNotFoundException, InterruptedException {
+  public void testOverReplicatedAndPolicySatisfied() throws IOException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -1007,11 +1042,12 @@ public class TestReplicationManager {
     final ContainerReplica replicaFour = getReplicas(
         id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFour);
 
     Mockito.when(containerPlacementPolicy.validateContainerPlacement(
         Mockito.argThat(list -> list.size() == 3),
@@ -1035,7 +1071,7 @@ public class TestReplicationManager {
 
   @Test
   public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
-      SCMException {
+      IOException {
     final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
     final ContainerID id = container.containerID();
     final UUID originNodeId = UUID.randomUUID();
@@ -1050,12 +1086,13 @@ public class TestReplicationManager {
     final ContainerReplica replicaFive = getReplicas(
         id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
 
-    containerStateManager.loadContainer(container);
-    containerStateManager.updateContainerReplica(id, replicaOne);
-    containerStateManager.updateContainerReplica(id, replicaTwo);
-    containerStateManager.updateContainerReplica(id, replicaThree);
-    containerStateManager.updateContainerReplica(id, replicaFour);
-    containerStateManager.updateContainerReplica(id, replicaFive);
+    containerStateManager.addContainer(container.getProtobuf());
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+    containerStateManager.updateContainerReplica(
+        id.getProtobuf(), replicaThree);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFour);
+    containerStateManager.updateContainerReplica(id.getProtobuf(), 
replicaFive);
 
     Mockito.when(containerPlacementPolicy.validateContainerPlacement(
         Mockito.argThat(list -> list != null && list.size() <= 4),
@@ -1082,8 +1119,7 @@ public class TestReplicationManager {
    * decommissioned replicas.
    */
   @Test
-  public void testUnderReplicatedDueToDecommission() throws
-      SCMException {
+  public void testUnderReplicatedDueToDecommission() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1096,8 +1132,7 @@ public class TestReplicationManager {
    * are decommissioning.
    */
   @Test
-  public void testUnderReplicatedDueToAllDecommission() throws
-      SCMException {
+  public void testUnderReplicatedDueToAllDecommission() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1110,8 +1145,7 @@ public class TestReplicationManager {
    * correctly replicated with decommissioned replicas still present.
    */
   @Test
-  public void testCorrectlyReplicatedWithDecommission() throws
-      SCMException {
+  public void testCorrectlyReplicatedWithDecommission() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1125,8 +1159,7 @@ public class TestReplicationManager {
    * is not met for maintenance.
    */
   @Test
-  public void testUnderReplicatedDueToMaintenance() throws
-      SCMException {
+  public void testUnderReplicatedDueToMaintenance() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1179,8 +1212,7 @@ public class TestReplicationManager {
    * are going into maintenance.
    */
   @Test
-  public void testUnderReplicatedDueToAllMaintenance() throws
-      SCMException {
+  public void testUnderReplicatedDueToAllMaintenance() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1193,8 +1225,7 @@ public class TestReplicationManager {
    * replica are available.
    */
   @Test
-  public void testCorrectlyReplicatedWithMaintenance() throws
-      SCMException {
+  public void testCorrectlyReplicatedWithMaintenance() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1208,8 +1239,8 @@ public class TestReplicationManager {
    * are decommissioning or maintenance.
    */
   @Test
-  public void testUnderReplicatedWithDecommissionAndMaintenance() throws
-      SCMException {
+  public void testUnderReplicatedWithDecommissionAndMaintenance()
+      throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1226,7 +1257,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testOverReplicatedClosedContainerWithDecomAndMaint()
-      throws SCMException {
+      throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1251,7 +1282,8 @@ public class TestReplicationManager {
     // Get the DECOM and Maint replica and ensure none of them are scheduled
     // for removal
     Set<ContainerReplica> decom =
-        containerStateManager.getContainerReplicas(container.containerID())
+        containerStateManager.getContainerReplicas(
+            container.containerID().getProtobuf())
         .stream()
         .filter(r -> r.getDatanodeDetails().getPersistedOpState() != 
IN_SERVICE)
         .collect(Collectors.toSet());
@@ -1269,8 +1301,7 @@ public class TestReplicationManager {
    * scheduled.
    */
   @Test
-  public void testUnderReplicatedNotHealthySource()
-      throws SCMException {
+  public void testUnderReplicatedNotHealthySource() throws IOException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     addReplica(container, NodeStatus.inServiceStale(), CLOSED);
     addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
@@ -1284,7 +1315,7 @@ public class TestReplicationManager {
    * if all the prerequisites are satisfied, move should work as expected.
    */
   @Test
-  public void testMove() throws SCMException, NodeNotFoundException,
+  public void testMove() throws IOException, NodeNotFoundException,
       InterruptedException, ExecutionException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     ContainerID id = container.containerID();
@@ -1315,7 +1346,7 @@ public class TestReplicationManager {
         SCMCommandProto.Type.deleteContainerCommand, 
dn1.getDatanodeDetails()));
     Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
         SCMCommandProto.Type.deleteContainerCommand));
-    containerStateManager.removeContainerReplica(id, dn1);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
 
     replicationManager.processAll();
     eventQueue.processAll(1000);
@@ -1398,10 +1429,10 @@ public class TestReplicationManager {
     //deleteContainerCommand will be sent again
     Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
         SCMCommandProto.Type.deleteContainerCommand));
-    containerStateManager.removeContainerReplica(id, dn1);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
 
     //replica in src datanode is deleted now
-    containerStateManager.removeContainerReplica(id, dn1);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
     replicationManager.processAll();
     eventQueue.processAll(1000);
 
@@ -1422,7 +1453,7 @@ public class TestReplicationManager {
    */
   @Test
   public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
-      throws SCMException, NodeNotFoundException,
+      throws IOException, NodeNotFoundException,
       InterruptedException, ExecutionException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     ContainerID id = container.containerID();
@@ -1449,7 +1480,7 @@ public class TestReplicationManager {
     //now, replication succeeds, but replica in dn2 lost,
     //and there are only tree replicas totally, so rm should
     //not delete the replica on dn1
-    containerStateManager.removeContainerReplica(id, dn2);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn2);
     replicationManager.processAll();
     eventQueue.processAll(1000);
 
@@ -1464,7 +1495,7 @@ public class TestReplicationManager {
    * test src and target datanode become unhealthy when moving.
    */
   @Test
-  public void testDnBecameUnhealthyWhenMoving() throws SCMException,
+  public void testDnBecameUnhealthyWhenMoving() throws IOException,
       NodeNotFoundException, InterruptedException, ExecutionException {
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
     ContainerID id = container.containerID();
@@ -1508,11 +1539,11 @@ public class TestReplicationManager {
    * some Prerequisites should be satisfied.
    */
   @Test
-  public void testMovePrerequisites()
-      throws SCMException, NodeNotFoundException,
-      InterruptedException, ExecutionException {
+  public void testMovePrerequisites() throws IOException, 
NodeNotFoundException,
+      InterruptedException, ExecutionException,
+      InvalidStateTransitionException {
     //all conditions is met
-    final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+    final ContainerInfo container = createContainer(LifeCycleState.OPEN);
     ContainerID id = container.containerID();
     ContainerReplica dn1 = addReplica(container,
         new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1536,17 +1567,31 @@ public class TestReplicationManager {
     replicationManager.start();
     Thread.sleep(100L);
 
-    //container in not in CLOSED state
-    for (LifeCycleState state : LifeCycleState.values()) {
-      if (state != LifeCycleState.CLOSED) {
-        container.setState(state);
-        cf = replicationManager.move(id,
-            new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
-        Assert.assertTrue(cf.isDone() && cf.get() ==
-            MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
-      }
-    }
-    container.setState(LifeCycleState.CLOSED);
+    //container in not in OPEN state
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+    //open -> closing
+    containerStateManager.updateContainerState(id.getProtobuf(),
+        LifeCycleEvent.FINALIZE);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+    //closing -> quasi_closed
+    containerStateManager.updateContainerState(id.getProtobuf(),
+        LifeCycleEvent.QUASI_CLOSE);
+    cf = replicationManager.move(id,
+        new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+    Assert.assertTrue(cf.isDone() && cf.get() ==
+        MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+
+    //quasi_closed -> closed
+    containerStateManager.updateContainerState(id.getProtobuf(),
+        LifeCycleEvent.FORCE_CLOSE);
+    Assert.assertTrue(LifeCycleState.CLOSED ==
+        containerStateManager.getContainer(id.getProtobuf()).getState());
 
     //Node is not in healthy state
     for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
@@ -1610,8 +1655,8 @@ public class TestReplicationManager {
 
     //make the replica num be 2 to test the case
     //that container is in inflightReplication
-    containerStateManager.removeContainerReplica(id, dn5);
-    containerStateManager.removeContainerReplica(id, dn4);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn5);
+    containerStateManager.removeContainerReplica(id.getProtobuf(), dn4);
     //replication manager should generate inflightReplication
     replicationManager.processAll();
     //waiting for inflightReplication generation
@@ -1623,8 +1668,7 @@ public class TestReplicationManager {
   }
 
   @Test
-  public void testReplicateCommandTimeout() throws
-      SCMException, InterruptedException {
+  public void testReplicateCommandTimeout() throws IOException {
     long timeout = new ReplicationManagerConfiguration().getEventTimeout();
 
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
@@ -1645,7 +1689,7 @@ public class TestReplicationManager {
 
   @Test
   public void testDeleteCommandTimeout() throws
-      SCMException, InterruptedException {
+      IOException, InterruptedException {
     long timeout = new ReplicationManagerConfiguration().getEventTimeout();
 
     final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
@@ -1667,9 +1711,9 @@ public class TestReplicationManager {
   }
 
   private ContainerInfo createContainer(LifeCycleState containerState)
-      throws SCMException {
+      throws IOException {
     final ContainerInfo container = getContainer(containerState);
-    containerStateManager.loadContainer(container);
+    containerStateManager.addContainer(container.getProtobuf());
     return container;
   }
 
@@ -1707,7 +1751,7 @@ public class TestReplicationManager {
     final ContainerReplica replica = getReplicas(
         container.containerID(), replicaState, 1000L, originNodeId, dn);
     containerStateManager
-        .updateContainerReplica(container.containerID(), replica);
+        .updateContainerReplica(container.containerID().getProtobuf(), 
replica);
     return replica;
   }
 
@@ -1741,7 +1785,9 @@ public class TestReplicationManager {
   public void teardown() throws Exception {
     containerStateManager.close();
     replicationManager.stop();
-    dbStore.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
     FileUtils.deleteDirectory(testDir);
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
index 0be7c27..3bb9263 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
@@ -20,10 +20,13 @@ import static 
org.apache.hadoop.hdds.scm.TestUtils.getContainer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.UUID;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -33,13 +36,22 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,13 +67,31 @@ public class TestUnknownContainerReport {
   private ContainerManager containerManager;
   private ContainerStateManager containerStateManager;
   private EventPublisher publisher;
+  private PipelineManager pipelineManager;
+  private File testDir;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
 
   @Before
   public void setup() throws IOException {
-    final ConfigurationSource conf = new OzoneConfiguration();
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
     this.nodeManager = new MockNodeManager(true, 10);
     this.containerManager = Mockito.mock(ContainerManager.class);
-    this.containerStateManager = new ContainerStateManager(conf);
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    pipelineManager =
+        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+    containerStateManager = ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
     this.publisher = Mockito.mock(EventPublisher.class);
 
     Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
@@ -69,12 +99,17 @@ public class TestUnknownContainerReport {
   }
 
   @After
-  public void tearDown() throws IOException {
+  public void tearDown() throws Exception {
     containerStateManager.close();
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
   @Test
-  public void testUnknownContainerNotDeleted() throws IOException {
+  public void testUnknownContainerNotDeleted() {
     OzoneConfiguration conf = new OzoneConfiguration();
     sendContainerReport(conf);
 
@@ -85,7 +120,7 @@ public class TestUnknownContainerReport {
   }
 
   @Test
-  public void testUnknownContainerDeleted() throws IOException {
+  public void testUnknownContainerDeleted() {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(
         ScmConfig.HDDS_SCM_UNKNOWN_CONTAINER_ACTION,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index 0fa6c9b..18d46c6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -20,7 +20,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.ratis.grpc.GrpcTlsConfig;
@@ -105,10 +105,10 @@ public class TestReplicationAnnotation {
     scmhaInvocationHandler = new SCMHAInvocationHandler(
         RequestType.CONTAINER, null, scmRatisServer);
 
-    ContainerStateManagerV2 proxy =
-        (ContainerStateManagerV2) Proxy.newProxyInstance(
+    ContainerStateManager proxy =
+        (ContainerStateManager) Proxy.newProxyInstance(
         SCMHAInvocationHandler.class.getClassLoader(),
-        new Class<?>[]{ContainerStateManagerV2.class}, scmhaInvocationHandler);
+        new Class<?>[]{ContainerStateManager.class}, scmhaInvocationHandler);
 
     try {
       proxy.addContainer(HddsProtos.ContainerInfoProto.getDefaultInstance());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index 018aada..6d4a1f4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -64,7 +64,7 @@ public class TestContainerStateManagerIntegration {
   private MiniOzoneCluster cluster;
   private StorageContainerManager scm;
   private ContainerManager containerManager;
-  private ContainerStateManagerV2 containerStateManager;
+  private ContainerStateManager containerStateManager;
   private int numContainerPerOwnerInPipeline;
   private  Set<ContainerID> excludedContainerIDS;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to