This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fc6a2ea144 HDDS-11650. ContainerId list to track all containers 
created in a datanode (#7402)
fc6a2ea144 is described below

commit fc6a2ea1445c33de880d58a84d0db0303e1d623d
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Nov 20 10:21:32 2024 -0800

    HDDS-11650. ContainerId list to track all containers created in a datanode 
(#7402)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../ozone/container/ContainerTestHelper.java       |  22 ++-
 .../ozone/container/common/impl/ContainerSet.java  | 130 ++++++++++++-
 .../container/common/impl/HddsDispatcher.java      |   6 +-
 .../container/common/volume/MutableVolumeSet.java  |   5 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  21 ++-
 .../container/metadata/AbstractDatanodeStore.java  | 202 +++++----------------
 .../ozone/container/metadata/AbstractRDBStore.java | 135 ++++++++++++++
 .../ozone/container/metadata/DBStoreManager.java   |  71 ++++++++
 .../ozone/container/metadata/DatanodeStore.java    |  49 +----
 .../metadata/WitnessedContainerDBDefinition.java   |  71 ++++++++
 .../metadata/WitnessedContainerMetadataStore.java  |  34 ++++
 .../WitnessedContainerMetadataStoreImpl.java       |  78 ++++++++
 .../ozone/container/ozoneimpl/ContainerReader.java |   3 +-
 .../ozoneimpl/OnDemandContainerDataScanner.java    |   3 +
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  33 +++-
 .../container/replication/ContainerImporter.java   |   2 +-
 .../common/volume/TestVolumeSetDiskChecks.java     |   1 +
 .../container/ozoneimpl/TestOzoneContainer.java    |  22 ++-
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |   9 +-
 .../hadoop/hdds/utils/db/InMemoryTestTable.java    | 133 ++++++++++++++
 .../src/main/compose/compatibility/docker-config   |   2 +-
 .../src/main/compose/ozone-balancer/docker-config  |   2 +-
 .../dist/src/main/compose/ozone-csi/docker-config  |   2 +-
 .../dist/src/main/compose/ozone-ha/docker-config   |   2 +-
 .../src/main/compose/ozone-om-ha/docker-config     |   2 +-
 .../main/compose/ozone-om-prepare/docker-config    |   2 +-
 .../src/main/compose/ozone-topology/docker-config  |   2 +-
 .../dist/src/main/compose/ozone/docker-config      |   2 +-
 .../src/main/compose/ozoneblockade/docker-config   |   2 +-
 .../src/main/compose/ozonescripts/docker-config    |   2 +-
 .../src/main/compose/ozonesecure-ha/docker-config  |   2 +-
 .../src/main/compose/ozonesecure-mr/docker-config  |   2 +-
 .../src/main/compose/ozonesecure/docker-config     |   2 +-
 .../dist/src/main/compose/restart/docker-config    |   2 +-
 .../main/compose/upgrade/compose/ha/docker-config  |   2 +-
 .../compose/upgrade/compose/non-ha/docker-config   |   2 +-
 .../compose/upgrade/compose/om-ha/docker-config    |   2 +-
 .../dist/src/main/compose/xcompat/docker-config    |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    | 163 ++++++++++++++++-
 .../hadoop/ozone/debug/DBDefinitionFactory.java    |   4 +-
 .../ozone/debug/container/ContainerCommands.java   |   2 +-
 .../ozone/freon/ClosedContainerReplicator.java     |  19 +-
 43 files changed, 1000 insertions(+), 255 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index e483feba98..49bfa1eae2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -122,6 +122,7 @@ public final class OzoneConsts {
   public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
   public static final String SCM_DB_BACKUP_PREFIX = "scm.db.backup.";
   public static final String CONTAINER_DB_NAME = "container.db";
+  public static final String WITNESSED_CONTAINER_DB_NAME = 
"witnessed_container.db";
 
   public static final String STORAGE_DIR_CHUNKS = "chunks";
   public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 2b7592e1c3..20372dcc6e 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -293,18 +293,31 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getCreateContainerRequest(
       long containerID, Pipeline pipeline) throws IOException {
+    return getCreateContainerRequest(containerID, pipeline, 
ContainerProtos.ContainerDataProto.State.OPEN);
+  }
+
+
+  /**
+   * Returns a create container command for test purposes. There are a bunch of
+   * tests where we need to just send a request and get a reply.
+   *
+   * @return ContainerCommandRequestProto.
+   */
+  public static ContainerCommandRequestProto getCreateContainerRequest(
+      long containerID, Pipeline pipeline, 
ContainerProtos.ContainerDataProto.State state) throws IOException {
     LOG.trace("addContainer: {}", containerID);
-    return getContainerCommandRequestBuilder(containerID, pipeline).build();
+    return getContainerCommandRequestBuilder(containerID, pipeline, state)
+        .build();
   }
 
   private static Builder getContainerCommandRequestBuilder(long containerID,
-      Pipeline pipeline) throws IOException {
+          Pipeline pipeline, ContainerProtos.ContainerDataProto.State state) 
throws IOException {
     Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.CreateContainer);
     request.setContainerID(containerID);
     request.setCreateContainer(
-        ContainerProtos.CreateContainerRequestProto.getDefaultInstance());
+        
ContainerProtos.CreateContainerRequestProto.getDefaultInstance().toBuilder().setState(state).build());
     request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
 
     return request;
@@ -320,7 +333,8 @@ public final class ContainerTestHelper {
       long containerID, Pipeline pipeline, Token<?> token) throws IOException {
     LOG.trace("addContainer: {}", containerID);
 
-    Builder request = getContainerCommandRequestBuilder(containerID, pipeline);
+    Builder request = getContainerCommandRequestBuilder(containerID, pipeline,
+        ContainerProtos.ContainerDataProto.State.OPEN);
     if (token != null) {
       request.setEncodedToken(token.encodeToUrlString());
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 5335021da9..8dd35064e6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -23,8 +23,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Message;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
@@ -65,10 +69,24 @@ public class ContainerSet implements Iterable<Container<?>> 
{
       new ConcurrentSkipListMap<>();
   private Clock clock;
   private long recoveringTimeout;
+  private final Table<Long, String> containerIdsTable;
 
+  @VisibleForTesting
   public ContainerSet(long recoveringTimeout) {
+    this(new InMemoryTestTable<>(), recoveringTimeout);
+  }
+
+  public ContainerSet(Table<Long, String> continerIdsTable, long 
recoveringTimeout) {
+    this(continerIdsTable, recoveringTimeout, false);
+  }
+
+  public ContainerSet(Table<Long, String> continerIdsTable, long 
recoveringTimeout, boolean readOnly) {
     this.clock = Clock.system(ZoneOffset.UTC);
+    this.containerIdsTable = continerIdsTable;
     this.recoveringTimeout = recoveringTimeout;
+    if (!readOnly && containerIdsTable == null) {
+      throw new IllegalArgumentException("Container table cannot be null when 
container set is not read only");
+    }
   }
 
   public long getCurrentTime() {
@@ -85,22 +103,64 @@ public class ContainerSet implements 
Iterable<Container<?>> {
     this.recoveringTimeout = recoveringTimeout;
   }
 
+  /**
+   * Add Container to container map. This would fail if the container is 
already present or has been marked as missing.
+   * @param container container to be added
+   * @return If container is added to containerMap returns true, otherwise
+   * false
+   */
+  public boolean addContainer(Container<?> container) throws 
StorageContainerException {
+    return addContainer(container, false);
+  }
+
+  /**
+   * Add Container to container map. This would overwrite the container even 
if it is missing. But would fail if the
+   * container is already present.
+   * @param container container to be added
+   * @return If container is added to containerMap returns true, otherwise
+   * false
+   */
+  public boolean addContainerByOverwriteMissingContainer(Container<?> 
container) throws StorageContainerException {
+    return addContainer(container, true);
+  }
+
+  public void ensureContainerNotMissing(long containerId, State state) throws 
StorageContainerException {
+    if (missingContainerSet.contains(containerId)) {
+      throw new StorageContainerException(String.format("Container with 
container Id %d with state : %s is missing in" +
+          " the DN.", containerId, state),
+          ContainerProtos.Result.CONTAINER_MISSING);
+    }
+  }
+
   /**
    * Add Container to container map.
    * @param container container to be added
+   * @param overwrite if true should overwrite the container if the container 
was missing.
    * @return If container is added to containerMap returns true, otherwise
    * false
    */
-  public boolean addContainer(Container<?> container) throws
+  private boolean addContainer(Container<?> container, boolean overwrite) 
throws
       StorageContainerException {
     Preconditions.checkNotNull(container, "container cannot be null");
 
     long containerId = container.getContainerData().getContainerID();
+    State containerState = container.getContainerData().getState();
+    if (!overwrite) {
+      ensureContainerNotMissing(containerId, containerState);
+    }
     if (containerMap.putIfAbsent(containerId, container) == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Container with container Id {} is added to containerMap",
             containerId);
       }
+      try {
+        if (containerIdsTable != null) {
+          containerIdsTable.put(containerId, containerState.toString());
+        }
+      } catch (IOException e) {
+        throw new StorageContainerException(e, 
ContainerProtos.Result.IO_EXCEPTION);
+      }
+      missingContainerSet.remove(containerId);
       // wish we could have done this from ContainerData.setState
       container.getContainerData().commitSpace();
       if (container.getContainerData().getState() == RECOVERING) {
@@ -122,21 +182,69 @@ public class ContainerSet implements 
Iterable<Container<?>> {
    * @return Container
    */
   public Container<?> getContainer(long containerId) {
-    Preconditions.checkState(containerId >= 0,
-        "Container Id cannot be negative.");
+    Preconditions.checkState(containerId >= 0, "Container Id cannot be 
negative.");
     return containerMap.get(containerId);
   }
 
+  /**
+   * Removes container from both memory and database. This should be used when 
the containerData on disk has been
+   * removed completely from the node.
+   * @param containerId
+   * @return True if container is removed from containerMap.
+   * @throws StorageContainerException
+   */
+  public boolean removeContainer(long containerId) throws 
StorageContainerException {
+    return removeContainer(containerId, false, true);
+  }
+
+  /**
+   * Removes containerId from memory. This needs to be used when the container 
is still present on disk, and the
+   * inmemory state of the container needs to be updated.
+   * @param containerId
+   * @return True if container is removed from containerMap.
+   * @throws StorageContainerException
+   */
+  public boolean removeContainerOnlyFromMemory(long containerId) throws 
StorageContainerException {
+    return removeContainer(containerId, false, false);
+  }
+
+  /**
+   * Marks a container to be missing, thus it removes the container from 
inmemory containerMap and marks the
+   * container as missing.
+   * @param containerId
+   * @return True if container is removed from containerMap.
+   * @throws StorageContainerException
+   */
+  public boolean removeMissingContainer(long containerId) throws 
StorageContainerException {
+    return removeContainer(containerId, true, false);
+  }
+
   /**
    * Removes the Container matching with specified containerId.
    * @param containerId ID of the container to remove
    * @return If container is removed from containerMap returns true, otherwise
    * false
    */
-  public boolean removeContainer(long containerId) {
+  private boolean removeContainer(long containerId, boolean markMissing, 
boolean removeFromDB)
+      throws StorageContainerException {
     Preconditions.checkState(containerId >= 0,
         "Container Id cannot be negative.");
+    //We need to add to missing container set before removing containerMap 
since there could be write chunk operation
+    // that could recreate the container in another volume if we remove it 
from the map before adding to missing
+    // container.
+    if (markMissing) {
+      missingContainerSet.add(containerId);
+    }
     Container<?> removed = containerMap.remove(containerId);
+    if (removeFromDB) {
+      try {
+        if (containerIdsTable != null) {
+          containerIdsTable.delete(containerId);
+        }
+      } catch (IOException e) {
+        throw new StorageContainerException(e, 
ContainerProtos.Result.IO_EXCEPTION);
+      }
+    }
     if (removed == null) {
       LOG.debug("Container with containerId {} is not present in " +
           "containerMap", containerId);
@@ -190,20 +298,20 @@ public class ContainerSet implements 
Iterable<Container<?>> {
    *
    * @param  context StateContext
    */
-  public void handleVolumeFailures(StateContext context) {
+  public void handleVolumeFailures(StateContext context) throws 
StorageContainerException {
     AtomicBoolean failedVolume = new AtomicBoolean(false);
     AtomicInteger containerCount = new AtomicInteger(0);
-    containerMap.values().forEach(c -> {
+    for (Container<?> c : containerMap.values()) {
       ContainerData data = c.getContainerData();
       if (data.getVolume().isFailed()) {
-        removeContainer(data.getContainerID());
+        removeMissingContainer(data.getContainerID());
         LOG.debug("Removing Container {} as the Volume {} " +
-              "has failed", data.getContainerID(), data.getVolume());
+            "has failed", data.getContainerID(), data.getVolume());
         failedVolume.set(true);
         containerCount.incrementAndGet();
         ContainerLogger.logLost(data, "Volume failure");
       }
-    });
+    }
 
     if (failedVolume.get()) {
       try {
@@ -362,6 +470,10 @@ public class ContainerSet implements 
Iterable<Container<?>> {
     return missingContainerSet;
   }
 
+  public Table<Long, String> getContainerIdsTable() {
+    return containerIdsTable;
+  }
+
   /**
    * Builds the missing container set by taking a diff between total no
    * containers actually found and number of containers which actually
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 5fc9718415..d1ea73fbfd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -175,7 +175,8 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     case CONTAINER_UNHEALTHY:
     case CLOSED_CONTAINER_IO:
     case DELETE_ON_OPEN_CONTAINER:
-    case UNSUPPORTED_REQUEST: // Blame client for sending unsupported request.
+    case UNSUPPORTED_REQUEST:// Blame client for sending unsupported request.
+    case CONTAINER_MISSING:
       return true;
     default:
       return false;
@@ -276,7 +277,8 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
         getMissingContainerSet().remove(containerID);
       }
     }
-    if (getMissingContainerSet().contains(containerID)) {
+    if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg)
+        && getMissingContainerSet().contains(containerID)) {
       StorageContainerException sce = new StorageContainerException(
           "ContainerID " + containerID
               + " has been lost and cannot be recreated on this DataNode",
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index e195b127d4..9afea8e6b0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -44,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +85,7 @@ public class MutableVolumeSet implements VolumeSet {
   private String clusterID;
 
   private final StorageVolumeChecker volumeChecker;
-  private Runnable failedVolumeListener;
+  private CheckedRunnable<IOException> failedVolumeListener;
   private StateContext context;
   private final StorageVolumeFactory volumeFactory;
   private final StorageVolume.VolumeType volumeType;
@@ -132,7 +133,7 @@ public class MutableVolumeSet implements VolumeSet {
     initializeVolumeSet();
   }
 
-  public void setFailedVolumeListener(Runnable runnable) {
+  public void setFailedVolumeListener(CheckedRunnable<IOException> runnable) {
     failedVolumeListener = runnable;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index aa9c4bd953..860615e0a4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -92,6 +92,8 @@ import 
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -119,8 +121,6 @@ import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
 import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto.State.RECOVERING;
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
 import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
@@ -354,6 +354,15 @@ public class KeyValueHandler extends Handler {
     }
 
     long containerID = request.getContainerID();
+    State containerState = request.getCreateContainer().getState();
+
+    if (containerState != RECOVERING) {
+      try {
+        containerSet.ensureContainerNotMissing(containerID, containerState);
+      } catch (StorageContainerException ex) {
+        return ContainerUtils.logAndReturnError(LOG, ex, request);
+      }
+    }
 
     ContainerLayoutVersion layoutVersion =
         ContainerLayoutVersion.getConfiguredVersion(conf);
@@ -378,7 +387,11 @@ public class KeyValueHandler extends Handler {
     try {
       if (containerSet.getContainer(containerID) == null) {
         newContainer.create(volumeSet, volumeChoosingPolicy, clusterId);
-        created = containerSet.addContainer(newContainer);
+        if (RECOVERING == newContainer.getContainerState()) {
+          created = 
containerSet.addContainerByOverwriteMissingContainer(newContainer);
+        } else {
+          created = containerSet.addContainer(newContainer);
+        }
       } else {
         // The create container request for an already existing container can
         // arrive in case the ContainerStateMachine reapplies the transaction
@@ -1070,7 +1083,7 @@ public class KeyValueHandler extends Handler {
      * might already be in closing state here.
      */
     if (containerState == State.OPEN || containerState == State.CLOSING
-        || containerState == State.RECOVERING) {
+        || containerState == RECOVERING) {
       return;
     }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
index 88aeb3c174..d9edd6d4cb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
@@ -17,27 +17,22 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
 import org.apache.hadoop.hdds.utils.db.DBProfile;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
-import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile;
-import org.rocksdb.InfoLogLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,14 +40,11 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.NoSuchElementException;
 
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
-import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE;
-
 /**
  * Implementation of the {@link DatanodeStore} interface that contains
  * functionality common to all more derived datanode store implementations.
  */
-public abstract class AbstractDatanodeStore implements DatanodeStore {
+public class AbstractDatanodeStore extends 
AbstractRDBStore<AbstractDatanodeDBDefinition> implements DatanodeStore {
 
   private Table<String, Long> metadataTable;
 
@@ -68,12 +60,6 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(AbstractDatanodeStore.class);
-  private volatile DBStore store;
-  private final AbstractDatanodeDBDefinition dbDef;
-  private final ManagedColumnFamilyOptions cfOptions;
-
-  private static DatanodeDBProfile dbProfile;
-  private final boolean openReadOnly;
 
   /**
    * Constructs the metadata store and starts the DB services.
@@ -84,114 +70,64 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
   protected AbstractDatanodeStore(ConfigurationSource config,
       AbstractDatanodeDBDefinition dbDef, boolean openReadOnly)
       throws IOException {
-
-    dbProfile = DatanodeDBProfile
-        .getProfile(config.getEnum(HDDS_DB_PROFILE, HDDS_DEFAULT_DB_PROFILE));
-
-    // The same config instance is used on each datanode, so we can share the
-    // corresponding column family options, providing a single shared cache
-    // for all containers on a datanode.
-    cfOptions = dbProfile.getColumnFamilyOptions(config);
-
-    this.dbDef = dbDef;
-    this.openReadOnly = openReadOnly;
-    start(config);
+    super(dbDef, config, openReadOnly);
   }
 
   @Override
-  public void start(ConfigurationSource config)
+  protected DBStore initDBStore(DBStoreBuilder dbStoreBuilder, 
ManagedDBOptions options, ConfigurationSource config)
       throws IOException {
-    if (this.store == null) {
-      ManagedDBOptions options = dbProfile.getDBOptions();
-      options.setCreateIfMissing(true);
-      options.setCreateMissingColumnFamilies(true);
-
-      if (this.dbDef instanceof DatanodeSchemaOneDBDefinition ||
-          this.dbDef instanceof DatanodeSchemaTwoDBDefinition) {
-        long maxWalSize = DBProfile.toLong(StorageUnit.MB.toBytes(2));
-        options.setMaxTotalWalSize(maxWalSize);
-      }
-
-      DatanodeConfiguration dc =
-          config.getObject(DatanodeConfiguration.class);
-      // Config user log files
-      InfoLogLevel level = InfoLogLevel.valueOf(
-          dc.getRocksdbLogLevel() + "_LEVEL");
-      options.setInfoLogLevel(level);
-      options.setMaxLogFileSize(dc.getRocksdbLogMaxFileSize());
-      options.setKeepLogFileNum(dc.getRocksdbLogMaxFileNum());
-      
-      if (this.dbDef instanceof DatanodeSchemaThreeDBDefinition) {
-        options.setDeleteObsoleteFilesPeriodMicros(
-            dc.getRocksdbDeleteObsoleteFilesPeriod());
-
-        // For V3, all Rocksdb dir has the same "container.db" name. So use
-        // parentDirName(storage UUID)-dbDirName as db metrics name
-        this.store = DBStoreBuilder.newBuilder(config, dbDef)
-            .setDBOptions(options)
-            .setDefaultCFOptions(cfOptions)
-            .setOpenReadOnly(openReadOnly)
-            .setDBJmxBeanNameName(dbDef.getDBLocation(config).getName() + "-" +
-                dbDef.getName())
-            .build();
-      } else {
-        this.store = DBStoreBuilder.newBuilder(config, dbDef)
-            .setDBOptions(options)
-            .setDefaultCFOptions(cfOptions)
-            .setOpenReadOnly(openReadOnly)
-            .build();
-      }
+    AbstractDatanodeDBDefinition dbDefinition = this.getDbDef();
+    if (dbDefinition instanceof DatanodeSchemaOneDBDefinition ||
+        dbDefinition instanceof DatanodeSchemaTwoDBDefinition) {
+      long maxWalSize = DBProfile.toLong(StorageUnit.MB.toBytes(2));
+      options.setMaxTotalWalSize(maxWalSize);
+    }
+    DatanodeConfiguration dc =
+        config.getObject(DatanodeConfiguration.class);
 
-      // Use the DatanodeTable wrapper to disable the table iterator on
-      // existing Table implementations retrieved from the DBDefinition.
-      // See the DatanodeTable's Javadoc for an explanation of why this is
-      // necessary.
-      metadataTable = new DatanodeTable<>(
-              dbDef.getMetadataColumnFamily().getTable(this.store));
-      checkTableStatus(metadataTable, metadataTable.getName());
-
-      // The block iterator this class returns will need to use the table
-      // iterator internally, so construct a block data table instance
-      // that does not have the iterator disabled by DatanodeTable.
-      blockDataTableWithIterator =
-              dbDef.getBlockDataColumnFamily().getTable(this.store);
-
-      blockDataTable = new DatanodeTable<>(blockDataTableWithIterator);
-      checkTableStatus(blockDataTable, blockDataTable.getName());
-
-      if (dbDef.getFinalizeBlocksColumnFamily() != null) {
-        finalizeBlocksTableWithIterator =
-            dbDef.getFinalizeBlocksColumnFamily().getTable(this.store);
-
-        finalizeBlocksTable = new DatanodeTable<>(
-            finalizeBlocksTableWithIterator);
-        checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName());
-      }
+    if (dbDefinition instanceof DatanodeSchemaThreeDBDefinition) {
+      options.setDeleteObsoleteFilesPeriodMicros(
+          dc.getRocksdbDeleteObsoleteFilesPeriod());
 
-      if (dbDef.getLastChunkInfoColumnFamily() != null) {
-        lastChunkInfoTable = new DatanodeTable<>(
-            dbDef.getLastChunkInfoColumnFamily().getTable(this.store));
-        checkTableStatus(lastChunkInfoTable, lastChunkInfoTable.getName());
-      }
+      // For V3, all Rocksdb dir has the same "container.db" name. So use
+      // parentDirName(storage UUID)-dbDirName as db metrics name
+      
dbStoreBuilder.setDBJmxBeanNameName(dbDefinition.getDBLocation(config).getName()
 + "-" +
+              dbDefinition.getName());
     }
-  }
-
-  @Override
-  public synchronized void stop() throws Exception {
-    if (store != null) {
-      store.close();
-      store = null;
+    DBStore dbStore = dbStoreBuilder.setDBOptions(options).build();
+
+    // Use the DatanodeTable wrapper to disable the table iterator on
+    // existing Table implementations retrieved from the DBDefinition.
+    // See the DatanodeTable's Javadoc for an explanation of why this is
+    // necessary.
+    metadataTable = new DatanodeTable<>(
+        dbDefinition.getMetadataColumnFamily().getTable(dbStore));
+    checkTableStatus(metadataTable, metadataTable.getName());
+
+    // The block iterator this class returns will need to use the table
+    // iterator internally, so construct a block data table instance
+    // that does not have the iterator disabled by DatanodeTable.
+    blockDataTableWithIterator =
+        dbDefinition.getBlockDataColumnFamily().getTable(dbStore);
+
+    blockDataTable = new DatanodeTable<>(blockDataTableWithIterator);
+    checkTableStatus(blockDataTable, blockDataTable.getName());
+
+    if (dbDefinition.getFinalizeBlocksColumnFamily() != null) {
+      finalizeBlocksTableWithIterator =
+          dbDefinition.getFinalizeBlocksColumnFamily().getTable(dbStore);
+
+      finalizeBlocksTable = new DatanodeTable<>(
+          finalizeBlocksTableWithIterator);
+      checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName());
     }
-  }
 
-  @Override
-  public DBStore getStore() {
-    return this.store;
-  }
-
-  @Override
-  public BatchOperationHandler getBatchHandler() {
-    return this.store;
+    if (dbDefinition.getLastChunkInfoColumnFamily() != null) {
+      lastChunkInfoTable = new DatanodeTable<>(
+          dbDefinition.getLastChunkInfoColumnFamily().getTable(dbStore));
+      checkTableStatus(lastChunkInfoTable, lastChunkInfoTable.getName());
+    }
+    return dbStore;
   }
 
   @Override
@@ -240,44 +176,6 @@ public abstract class AbstractDatanodeStore implements 
DatanodeStore {
         finalizeBlocksTableWithIterator.iterator(), filter);
   }
 
-  @Override
-  public synchronized boolean isClosed() {
-    if (this.store == null) {
-      return true;
-    }
-    return this.store.isClosed();
-  }
-
-  @Override
-  public void close() throws IOException {
-    this.store.close();
-    this.cfOptions.close();
-  }
-
-  @Override
-  public void flushDB() throws IOException {
-    store.flushDB();
-  }
-
-  @Override
-  public void flushLog(boolean sync) throws IOException {
-    store.flushLog(sync);
-  }
-
-  @Override
-  public void compactDB() throws IOException {
-    store.compactDB();
-  }
-
-  @VisibleForTesting
-  public DatanodeDBProfile getDbProfile() {
-    return dbProfile;
-  }
-
-  protected AbstractDatanodeDBDefinition getDbDef() {
-    return this.dbDef;
-  }
-
   protected Table<String, BlockData> getBlockDataTableWithIterator() {
     return this.blockDataTableWithIterator;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
new file mode 100644
index 0000000000..5ce1a85b38
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
@@ -0,0 +1,135 @@
+package org.apache.hadoop.ozone.container.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile;
+import org.rocksdb.InfoLogLevel;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static 
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE;
+
+/**
+ * Abstract Interface defining the way to interact with any rocksDB in the 
datanode.
+ * @param <DEF> Generic parameter defining the schema for the DB.
+ */
+public abstract class AbstractRDBStore<DEF extends DBDefinition> implements 
DBStoreManager {
+  private final DEF dbDef;
+  private final ManagedColumnFamilyOptions cfOptions;
+  private static DatanodeDBProfile dbProfile;
+  private final boolean openReadOnly;
+  private volatile DBStore store;
+
+  protected AbstractRDBStore(DEF dbDef, ConfigurationSource config, boolean 
openReadOnly) throws IOException {
+    dbProfile = DatanodeDBProfile.getProfile(config.getEnum(HDDS_DB_PROFILE, 
HDDS_DEFAULT_DB_PROFILE));
+
+    // The same config instance is used on each datanode, so we can share the
+    // corresponding column family options, providing a single shared cache
+    // for all containers on a datanode.
+    cfOptions = dbProfile.getColumnFamilyOptions(config);
+    this.dbDef = dbDef;
+    this.openReadOnly = openReadOnly;
+    start(config);
+  }
+
+  public void start(ConfigurationSource config)
+      throws IOException {
+    if (this.store == null) {
+      ManagedDBOptions options = dbProfile.getDBOptions();
+      options.setCreateIfMissing(true);
+      options.setCreateMissingColumnFamilies(true);
+
+      DatanodeConfiguration dc =
+          config.getObject(DatanodeConfiguration.class);
+      // Config user log files
+      InfoLogLevel level = InfoLogLevel.valueOf(
+          dc.getRocksdbLogLevel() + "_LEVEL");
+      options.setInfoLogLevel(level);
+      options.setMaxLogFileSize(dc.getRocksdbLogMaxFileSize());
+      options.setKeepLogFileNum(dc.getRocksdbLogMaxFileNum());
+      this.store = initDBStore(DBStoreBuilder.newBuilder(config, dbDef)
+          .setDBOptions(options)
+          .setDefaultCFOptions(cfOptions)
+          .setOpenReadOnly(openReadOnly), options, config);
+    }
+  }
+
+  protected abstract DBStore initDBStore(DBStoreBuilder dbStoreBuilder, 
ManagedDBOptions options,
+                                         ConfigurationSource config) throws 
IOException;
+
+  public synchronized void stop() throws Exception {
+    if (store != null) {
+      store.close();
+      store = null;
+    }
+  }
+
+  public DBStore getStore() {
+    return this.store;
+  }
+
+  public synchronized boolean isClosed() {
+    if (this.store == null) {
+      return true;
+    }
+    return this.store.isClosed();
+  }
+
+  public BatchOperationHandler getBatchHandler() {
+    return this.store;
+  }
+
+  public void close() throws IOException {
+    this.store.close();
+    this.cfOptions.close();
+  }
+
+  public void flushDB() throws IOException {
+    store.flushDB();
+  }
+
+  public void flushLog(boolean sync) throws IOException {
+    store.flushLog(sync);
+  }
+
+  public void compactDB() throws IOException {
+    store.compactDB();
+  }
+
+  @VisibleForTesting
+  public DatanodeDBProfile getDbProfile() {
+    return dbProfile;
+  }
+
+  protected DEF getDbDef() {
+    return this.dbDef;
+  }
+
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DBStoreManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DBStoreManager.java
new file mode 100644
index 0000000000..ec9849950a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DBStoreManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.metadata;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for interacting with datanode databases.
+ */
+public interface DBStoreManager extends Closeable {
+
+  /**
+   * Start datanode manager.
+   *
+   * @param configuration - Configuration
+   * @throws IOException - Unable to start datanode store.
+   */
+  void start(ConfigurationSource configuration) throws IOException;
+
+  /**
+   * Stop datanode manager.
+   */
+  void stop() throws Exception;
+
+  /**
+   * Get datanode store.
+   *
+   * @return datanode store.
+   */
+  DBStore getStore();
+
+  /**
+   * Helper to create and write batch transactions.
+   */
+  BatchOperationHandler getBatchHandler();
+
+  void flushLog(boolean sync) throws IOException;
+
+  void flushDB() throws IOException;
+
+  void compactDB() throws IOException;
+
+  /**
+   * Returns if the underlying DB is closed. This call is thread safe.
+   * @return true if the DB is closed.
+   */
+  boolean isClosed();
+
+  default void compactionIfNeeded() throws Exception {
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
index d791d9bbea..3ebdc3f629 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
@@ -17,22 +17,16 @@
  */
 package org.apache.hadoop.ozone.container.metadata;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
-import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
@@ -40,31 +34,10 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 /**
  * Interface for interacting with datanode databases.
  */
-public interface DatanodeStore extends Closeable {
+public interface DatanodeStore extends DBStoreManager {
   String NO_SUCH_BLOCK_ERR_MSG =
           "Unable to find the block.";
 
-  /**
-   * Start datanode manager.
-   *
-   * @param configuration - Configuration
-   * @throws IOException - Unable to start datanode store.
-   */
-  void start(ConfigurationSource configuration) throws IOException;
-
-  /**
-   * Stop datanode manager.
-   */
-  void stop() throws Exception;
-
-  /**
-   * Get datanode store.
-   *
-   * @return datanode store.
-   */
-  @VisibleForTesting
-  DBStore getStore();
-
   /**
    * A Table that keeps the block data.
    *
@@ -100,17 +73,6 @@ public interface DatanodeStore extends Closeable {
    */
   Table<String, BlockData> getLastChunkInfoTable();
 
-  /**
-   * Helper to create and write batch transactions.
-   */
-  BatchOperationHandler getBatchHandler();
-
-  void flushLog(boolean sync) throws IOException;
-
-  void flushDB() throws IOException;
-
-  void compactDB() throws IOException;
-
   BlockIterator<BlockData> getBlockIterator(long containerID)
       throws IOException;
 
@@ -120,15 +82,6 @@ public interface DatanodeStore extends Closeable {
   BlockIterator<Long> getFinalizeBlockIterator(long containerID,
       KeyPrefixFilter filter) throws IOException;
 
-  /**
-   * Returns if the underlying DB is closed. This call is thread safe.
-   * @return true if the DB is closed.
-   */
-  boolean isClosed();
-
-  default void compactionIfNeeded() throws Exception {
-  }
-
   default BlockData getBlockByID(BlockID blockID,
       String blockKey) throws IOException {
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerDBDefinition.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerDBDefinition.java
new file mode 100644
index 0000000000..a15ab27a69
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerDBDefinition.java
@@ -0,0 +1,71 @@
+package org.apache.hadoop.ozone.container.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+import java.util.Map;
+
+/**
+ * Class for defining the schema for master volume in a datanode.
+ */
+public final class WitnessedContainerDBDefinition extends DBDefinition.WithMap 
{
+
+  private static final String CONTAINER_IDS_TABLE_NAME = "containerIds";
+
+  public static final DBColumnFamilyDefinition<Long, String>
+      CONTAINER_IDS_TABLE = new DBColumnFamilyDefinition<>(
+      CONTAINER_IDS_TABLE_NAME,
+      LongCodec.get(),
+      StringCodec.get());
+
+  private static final Map<String, DBColumnFamilyDefinition<?, ?>>
+      COLUMN_FAMILIES = DBColumnFamilyDefinition.newUnmodifiableMap(
+      CONTAINER_IDS_TABLE);
+
+  private static final WitnessedContainerDBDefinition INSTANCE = new 
WitnessedContainerDBDefinition();
+
+  public static WitnessedContainerDBDefinition get() {
+    return INSTANCE;
+  }
+
+  private WitnessedContainerDBDefinition() {
+    super(COLUMN_FAMILIES);
+  }
+
+  @Override
+  public String getName() {
+    return OzoneConsts.WITNESSED_CONTAINER_DB_NAME;
+  }
+
+  @Override
+  public String getLocationConfigKey() {
+    return ScmConfigKeys.OZONE_SCM_DATANODE_ID_DIR;
+  }
+
+  public DBColumnFamilyDefinition<Long, String> getContainerIdsTable() {
+    return CONTAINER_IDS_TABLE;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStore.java
new file mode 100644
index 0000000000..b16c7b981c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStore.java
@@ -0,0 +1,34 @@
+package org.apache.hadoop.ozone.container.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.hadoop.hdds.utils.db.Table;
+
+/**
+ * Interface for interacting with database in the master volume of a datanode.
+ */
+public interface WitnessedContainerMetadataStore extends DBStoreManager {
+  /**
+   * A Table that keeps the containerIds in a datanode.
+   *
+   * @return Table
+   */
+  Table<Long, String> getContainerIdsTable();
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStoreImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStoreImpl.java
new file mode 100644
index 0000000000..270daf815b
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/WitnessedContainerMetadataStoreImpl.java
@@ -0,0 +1,78 @@
+package org.apache.hadoop.ozone.container.metadata;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Class for interacting with database in the master volume of a datanode.
+ */
+public final class WitnessedContainerMetadataStoreImpl extends 
AbstractRDBStore<WitnessedContainerDBDefinition>
+    implements WitnessedContainerMetadataStore {
+
+  private Table<Long, String> containerIdsTable;
+  private static final ConcurrentMap<String, WitnessedContainerMetadataStore> 
INSTANCES =
+      new ConcurrentHashMap<>();
+
+  public static WitnessedContainerMetadataStore get(ConfigurationSource conf)
+      throws IOException {
+    String dbDirPath = 
DBStoreBuilder.getDBDirPath(WitnessedContainerDBDefinition.get(), 
conf).getAbsolutePath();
+    try {
+      return INSTANCES.compute(dbDirPath, (k, v) -> {
+        if (v == null || v.isClosed()) {
+          try {
+            return new WitnessedContainerMetadataStoreImpl(conf, false);
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+        return v;
+      });
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
+    }
+  }
+
+  private WitnessedContainerMetadataStoreImpl(ConfigurationSource config, 
boolean openReadOnly) throws IOException {
+    super(WitnessedContainerDBDefinition.get(), config, openReadOnly);
+  }
+
+  @Override
+  protected DBStore initDBStore(DBStoreBuilder dbStoreBuilder, 
ManagedDBOptions options, ConfigurationSource config)
+      throws IOException {
+    DBStore dbStore = dbStoreBuilder.build();
+    this.containerIdsTable = 
this.getDbDef().getContainerIdsTable().getTable(dbStore);
+    return dbStore;
+  }
+
+  @Override
+  public Table<Long, String> getContainerIdsTable() {
+    return containerIdsTable;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 1685d1c5fe..027fbff89c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -320,8 +320,7 @@ public class ContainerReader implements Runnable {
 
   private void swapAndRemoveContainer(KeyValueContainer existing,
       KeyValueContainer toAdd) throws IOException {
-    containerSet.removeContainer(
-        existing.getContainerData().getContainerID());
+    
containerSet.removeContainerOnlyFromMemory(existing.getContainerData().getContainerID());
     containerSet.addContainer(toAdd);
     KeyValueContainerUtil.removeContainer(existing.getContainerData(),
         hddsVolume.getConf());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
index 44884c5c29..edac2f596e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
@@ -80,6 +80,9 @@ public final class OnDemandContainerDataScanner {
   }
 
   private static boolean shouldScan(Container<?> container) {
+    if (container == null) {
+      return false;
+    }
     long containerID = container.getContainerData().getContainerID();
     if (instance == null) {
       LOG.debug("Skipping on demand scan for container {} since scanner was " +
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 56c4233836..5307f393e0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -29,11 +29,14 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -58,6 +61,8 @@ import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
+import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStore;
+import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStoreImpl;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
@@ -71,6 +76,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -129,6 +135,7 @@ public class OzoneContainer {
   private ScheduledExecutorService dbCompactionExecutorService;
 
   private final ContainerMetrics metrics;
+  private WitnessedContainerMetadataStore witnessedContainerMetadataStore;
 
   enum InitializingStatus {
     UNINITIALIZED, INITIALIZING, INITIALIZED
@@ -179,12 +186,11 @@ public class OzoneContainer {
             TimeUnit.MINUTES);
       }
     }
-
     long recoveringContainerTimeout = config.getTimeDuration(
         OZONE_RECOVERING_CONTAINER_TIMEOUT,
         OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-
-    containerSet = new ContainerSet(recoveringContainerTimeout);
+    this.witnessedContainerMetadataStore = 
WitnessedContainerMetadataStoreImpl.get(conf);
+    containerSet = new 
ContainerSet(witnessedContainerMetadataStore.getContainerIdsTable(), 
recoveringContainerTimeout);
     metadataScanner = null;
 
     metrics = ContainerMetrics.create(conf);
@@ -305,7 +311,7 @@ public class OzoneContainer {
    * Build's container map after volume format.
    */
   @VisibleForTesting
-  public void buildContainerSet() {
+  public void buildContainerSet() throws IOException {
     Iterator<StorageVolume> volumeSetIterator = volumeSet.getVolumesList()
         .iterator();
     ArrayList<Thread> volumeThreads = new ArrayList<>();
@@ -333,6 +339,14 @@ public class OzoneContainer {
       for (int i = 0; i < volumeThreads.size(); i++) {
         volumeThreads.get(i).join();
       }
+      try (TableIterator<Long, ? extends Table.KeyValue<Long, String>> itr =
+               containerSet.getContainerIdsTable().iterator()) {
+        Map<Long, Long> containerIds = new HashMap<>();
+        while (itr.hasNext()) {
+          containerIds.put(itr.next().getKey(), 0L);
+        }
+        containerSet.buildMissingContainerSetAndValidate(containerIds);
+      }
     } catch (InterruptedException ex) {
       LOG.error("Volume Threads Interrupted exception", ex);
       Thread.currentThread().interrupt();
@@ -529,9 +543,18 @@ public class OzoneContainer {
     recoveringContainerScrubbingService.shutdown();
     IOUtils.closeQuietly(metrics);
     ContainerMetrics.remove();
+    if (this.witnessedContainerMetadataStore != null) {
+      try {
+        this.witnessedContainerMetadataStore.stop();
+      } catch (Exception e) {
+        LOG.error("Error while stopping witnessedContainerMetadataStore. 
Status of store: {}",
+            witnessedContainerMetadataStore.isClosed(), e);
+      }
+      this.witnessedContainerMetadataStore = null;
+    }
   }
 
-  public void handleVolumeFailures() {
+  public void handleVolumeFailures() throws StorageContainerException {
     if (containerSet != null) {
       containerSet.handleVolumeFailures(context);
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index f20094079c..58a5d67463 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -128,7 +128,7 @@ public class ContainerImporter {
       try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
         Container container = controller.importContainer(
             containerData, input, packer);
-        containerSet.addContainer(container);
+        containerSet.addContainerByOverwriteMissingContainer(container);
       }
     } finally {
       importContainerProgress.remove(containerID);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
index 55df5f43b6..0b24161aad 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestVolumeSetDiskChecks.java
@@ -340,6 +340,7 @@ public class TestVolumeSetDiskChecks {
     conSet.handleVolumeFailures(stateContext);
     // ContainerID1 should be removed belonging to failed volume
     assertNull(conSet.getContainer(containerID1));
+    assertTrue(conSet.getMissingContainerSet().contains(containerID1));
     // ContainerID should exist belonging to normal volume
     assertNotNull(conSet.getContainer(containerID));
     expectedReportCount.put(
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 60552e7cc9..2f2cbc81e9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -51,7 +52,9 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.HashMap;
 import java.util.List;
@@ -122,7 +125,7 @@ public class TestOzoneContainer {
       volume.format(clusterId);
       commitSpaceMap.put(getVolumeKey(volume), Long.valueOf(0));
     }
-
+    List<KeyValueContainerData> containerDatas = new ArrayList<>();
     // Add containers to disk
     int numTestContainers = 10;
     for (int i = 0; i < numTestContainers; i++) {
@@ -136,6 +139,7 @@ public class TestOzoneContainer {
           layout,
           maxCap, UUID.randomUUID().toString(),
           datanodeDetails.getUuidString());
+      containerDatas.add(keyValueContainerData);
       keyValueContainer = new KeyValueContainer(
           keyValueContainerData, conf);
       keyValueContainer.create(volumeSet, volumeChoosingPolicy, clusterId);
@@ -156,8 +160,22 @@ public class TestOzoneContainer {
     ozoneContainer.buildContainerSet();
     ContainerSet containerset = ozoneContainer.getContainerSet();
     assertEquals(numTestContainers, containerset.containerCount());
-
     verifyCommittedSpace(ozoneContainer);
+    Set<Long> missingContainers = new HashSet<>();
+    for (int i = 0; i < numTestContainers; i++) {
+      if (i % 2 == 0) {
+        missingContainers.add(containerDatas.get(i).getContainerID());
+        FileUtils.deleteDirectory(new 
File(containerDatas.get(i).getContainerPath()));
+      }
+    }
+    ozoneContainer.stop();
+    ozoneContainer = ContainerTestUtils.getOzoneContainer(datanodeDetails, 
conf);
+    ozoneContainer.buildContainerSet();
+    containerset = ozoneContainer.getContainerSet();
+    assertEquals(numTestContainers / 2, containerset.containerCount());
+    assertEquals(numTestContainers / 2 + numTestContainers % 2, 
containerset.getMissingContainerSet().size());
+    assertEquals(missingContainers, containerset.getMissingContainerSet());
+    ozoneContainer.stop();
   }
 
   @ContainerTestVersionInfo.ContainerTest
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index ed8d145b66..1e42241ee4 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -163,7 +163,8 @@ public final class DBStoreBuilder {
         OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT, StorageUnit.BYTES);
   }
 
-  private void applyDBDefinition(DBDefinition definition) {
+  public static File getDBDirPath(DBDefinition definition,
+                                  ConfigurationSource configuration) {
     // Set metadata dirs.
     File metadataDir = definition.getDBLocation(configuration);
 
@@ -174,6 +175,12 @@ public final class DBStoreBuilder {
           HddsConfigKeys.OZONE_METADATA_DIRS);
       metadataDir = getOzoneMetaDirPath(configuration);
     }
+    return metadataDir;
+  }
+
+  private void applyDBDefinition(DBDefinition definition) {
+    // Set metadata dirs.
+    File metadataDir = getDBDirPath(definition, configuration);
 
     setName(definition.getName());
     setPath(Paths.get(metadataDir.getPath()));
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
new file mode 100644
index 0000000000..9cc1695298
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java
@@ -0,0 +1,133 @@
+package org.apache.hadoop.hdds.utils.db;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * InMemory Table implementation for tests.
+ */
+public final class InMemoryTestTable<KEY, VALUE> implements Table<KEY, VALUE> {
+  private final Map<KEY, VALUE> map = new ConcurrentHashMap<>();
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void put(KEY key, VALUE value) {
+    map.put(key, value);
+  }
+
+  @Override
+  public void putWithBatch(BatchOperation batch, KEY key, VALUE value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  @Override
+  public boolean isExist(KEY key) {
+    return map.containsKey(key);
+  }
+
+  @Override
+  public VALUE get(KEY key) {
+    return map.get(key);
+  }
+
+  @Override
+  public VALUE getIfExist(KEY key) {
+    return map.get(key);
+  }
+
+  @Override
+  public void delete(KEY key) {
+    map.remove(key);
+  }
+
+  @Override
+  public void deleteWithBatch(BatchOperation batch, KEY key) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteRange(KEY beginKey, KEY endKey) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TableIterator<KEY, ? extends KeyValue<KEY, VALUE>> iterator(KEY 
prefix) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getName() {
+    return "";
+  }
+
+  @Override
+  public long getEstimatedKeyCount() {
+    return map.size();
+  }
+
+  @Override
+  public List<? extends KeyValue<KEY, VALUE>> getRangeKVs(KEY startKey, int 
count, KEY prefix,
+                                                          
MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<? extends KeyValue<KEY, VALUE>> getSequentialRangeKVs(KEY 
startKey, int count, KEY prefix,
+                                                                    
MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void deleteBatchWithPrefix(BatchOperation batch, KEY prefix) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void dumpToFileWithPrefix(File externalFile, KEY prefix) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void loadFromFile(File externalFile) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hadoop-ozone/dist/src/main/compose/compatibility/docker-config 
b/hadoop-ozone/dist/src/main/compose/compatibility/docker-config
index d3984110d8..f7f1c24b8a 100644
--- a/hadoop-ozone/dist/src/main/compose/compatibility/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/compatibility/docker-config
@@ -21,7 +21,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-balancer/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-balancer/docker-config
index 10d9f5c8cf..f4866c4240 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-balancer/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-balancer/docker-config
@@ -34,7 +34,7 @@ OZONE-SITE.XML_ozone.scm.address.scmservice.scm1=scm1
 OZONE-SITE.XML_ozone.scm.address.scmservice.scm2=scm2
 OZONE-SITE.XML_ozone.scm.address.scmservice.scm3=scm3
 OZONE-SITE.XML_ozone.scm.ratis.enable=true
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.container.size=100MB
 OZONE-SITE.XML_ozone.scm.block.size=20MB
 OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
index 623f959558..ba4d80a9d0 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-csi/docker-config
@@ -27,7 +27,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
index 08c490ea51..ebf2ce532b 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-ha/docker-config
@@ -34,7 +34,7 @@ OZONE-SITE.XML_ozone.scm.address.scmservice.scm1=scm1
 OZONE-SITE.XML_ozone.scm.address.scmservice.scm2=scm2
 OZONE-SITE.XML_ozone.scm.address.scmservice.scm3=scm3
 OZONE-SITE.XML_ozone.scm.ratis.enable=true
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 65834455ea..ae2fb092be 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -23,7 +23,7 @@ OZONE-SITE.XML_ozone.om.address.omservice.om2=om2
 OZONE-SITE.XML_ozone.om.address.omservice.om3=om3
 OZONE-SITE.XML_ozone.om.ratis.enable=true
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-prepare/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-om-prepare/docker-config
index 79d2e5285f..f0ec8fcaa1 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-prepare/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-prepare/docker-config
@@ -24,7 +24,7 @@ OZONE-SITE.XML_ozone.om.address.omservice.om3=om3
 OZONE-SITE.XML_ozone.om.ratis.enable=true
 
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
index 8239aad2a5..59b1fcf8ca 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
@@ -24,7 +24,7 @@ OZONE-SITE.XML_ozone.ozone.scm.block.size=64MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
index a657f22340..f2a9e04479 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config
@@ -29,7 +29,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
index 06696a0e41..87b0cb5053 100644
--- a/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozoneblockade/docker-config
@@ -19,7 +19,7 @@ CORE-SITE.XML_fs.defaultFS=ofs://om
 OZONE-SITE.XML_ozone.om.address=om
 OZONE-SITE.XML_ozone.om.http-address=om:9874
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.handler.type=distributed
diff --git a/hadoop-ozone/dist/src/main/compose/ozonescripts/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonescripts/docker-config
index 66f4cf151e..adfaeb287d 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonescripts/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonescripts/docker-config
@@ -17,7 +17,7 @@
 CORE-SITE.XML_fs.defaultFS=hdfs://namenode:9000
 OZONE-SITE.XML_ozone.ksm.address=ksm
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.om.address=om
 OZONE-SITE.XML_ozone.om.http-address=om:9874
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 38cc5b71a1..1495e89813 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -47,7 +47,7 @@ OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.handler.type=distributed
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
index 12a7819d1a..2a58ffcf38 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
@@ -22,7 +22,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.handler.type=distributed
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index 4f13d62496..387a1c8517 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -27,7 +27,7 @@ OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.handler.type=distributed
diff --git a/hadoop-ozone/dist/src/main/compose/restart/docker-config 
b/hadoop-ozone/dist/src/main/compose/restart/docker-config
index 161af7a297..852eb6647c 100644
--- a/hadoop-ozone/dist/src/main/compose/restart/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/restart/docker-config
@@ -21,7 +21,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git 
a/hadoop-ozone/dist/src/main/compose/upgrade/compose/ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/upgrade/compose/ha/docker-config
index a1b6da80c4..d06d3279dc 100644
--- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/ha/docker-config
@@ -35,7 +35,7 @@ OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1
 
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
 OZONE-SITE.XML_hdds.datanode.volume.min.free.space=100MB
diff --git 
a/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
index 88126ddf2c..ce4a8807e5 100644
--- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
@@ -25,7 +25,7 @@ 
OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.client.address=scm
diff --git 
a/hadoop-ozone/dist/src/main/compose/upgrade/compose/om-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/upgrade/compose/om-ha/docker-config
index 77fa2b40ee..a049ba5f01 100644
--- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/om-ha/docker-config
@@ -27,7 +27,7 @@ OZONE-SITE.XML_ozone.om.ratis.enable=true
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
 OZONE-SITE.XML_ozone.scm.names=scm
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.client.address=scm
diff --git a/hadoop-ozone/dist/src/main/compose/xcompat/docker-config 
b/hadoop-ozone/dist/src/main/compose/xcompat/docker-config
index 1a61aaf4f7..746b2b6e94 100644
--- a/hadoop-ozone/dist/src/main/compose/xcompat/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/xcompat/docker-config
@@ -32,7 +32,7 @@ OZONE-SITE.XML_ozone.scm.block.client.address=scm
 OZONE-SITE.XML_ozone.scm.client.address=scm
 OZONE-SITE.XML_ozone.scm.container.size=1GB
 OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
-OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
+OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata
 OZONE-SITE.XML_ozone.scm.names=scm
 OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
 OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 8e72d5e5d9..553ea03f1f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -31,11 +32,13 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -158,6 +161,159 @@ public class TestOzoneContainer {
     }
   }
 
+  @Test
+  public void testOzoneContainerWithMissingContainer() throws Exception {
+    MiniOzoneCluster cluster = null;
+    try {
+      long containerID =
+          ContainerTestHelper.getTestContainerID();
+      OzoneConfiguration conf = newOzoneConfiguration();
+
+      // Start ozone container Via Datanode create.
+      cluster = MiniOzoneCluster.newBuilder(conf)
+          .setNumDatanodes(1)
+          .build();
+      cluster.waitForClusterToBeReady();
+
+      runTestOzoneContainerWithMissingContainer(cluster, containerID);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void runTestOzoneContainerWithMissingContainer(
+      MiniOzoneCluster cluster, long testContainerID) throws Exception {
+    ContainerProtos.ContainerCommandRequestProto
+        request, writeChunkRequest, putBlockRequest,
+        updateRequest1, updateRequest2;
+    ContainerProtos.ContainerCommandResponseProto response,
+        updateResponse1, updateResponse2;
+    XceiverClientGrpc client = null;
+    try {
+      // This client talks to ozone container via datanode.
+      client = createClientForTesting(cluster);
+      client.connect();
+      Pipeline pipeline = client.getPipeline();
+      createContainerForTesting(client, testContainerID);
+      writeChunkRequest = writeChunkForContainer(client, testContainerID,
+          1024);
+
+      DatanodeDetails datanodeDetails = 
cluster.getHddsDatanodes().get(0).getDatanodeDetails();
+      File containerPath =
+          new 
File(cluster.getHddsDatanode(datanodeDetails).getDatanodeStateMachine()
+              .getContainer().getContainerSet().getContainer(testContainerID)
+              .getContainerData().getContainerPath());
+      cluster.getHddsDatanode(datanodeDetails).stop();
+      FileUtils.deleteDirectory(containerPath);
+
+      // Restart & Check if the container has been marked as missing, since 
the container directory has been deleted.
+      cluster.restartHddsDatanode(datanodeDetails, false);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
cluster.getHddsDatanode(datanodeDetails).getDatanodeStateMachine()
+              .getContainer().getContainerSet()
+              .getMissingContainerSet().contains(testContainerID);
+        } catch (IOException e) {
+          return false;
+        }
+      }, 1000, 30000);
+
+      // Read Chunk
+      request = ContainerTestHelper.getReadChunkRequest(
+          pipeline, writeChunkRequest.getWriteChunk());
+
+      response = client.sendCommand(request);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND, 
response.getResult());
+
+      response = createContainerForTesting(client, testContainerID);
+      assertEquals(ContainerProtos.Result.CONTAINER_MISSING, 
response.getResult());
+
+      // Put Block
+      putBlockRequest = ContainerTestHelper.getPutBlockRequest(
+          pipeline, writeChunkRequest.getWriteChunk());
+
+      response = client.sendCommand(putBlockRequest);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.CONTAINER_MISSING, 
response.getResult());
+
+      // Write chunk
+      response = client.sendCommand(writeChunkRequest);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.CONTAINER_MISSING, 
response.getResult());
+
+      // Get Block
+      request = ContainerTestHelper.
+          getBlockRequest(pipeline, putBlockRequest.getPutBlock());
+      response = client.sendCommand(request);
+      assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND, 
response.getResult());
+
+      // Create Container
+      request  = 
ContainerTestHelper.getCreateContainerRequest(testContainerID, pipeline);
+      response = client.sendCommand(request);
+      assertEquals(ContainerProtos.Result.CONTAINER_MISSING, 
response.getResult());
+
+      // Delete Block and Delete Chunk are handled by BlockDeletingService
+      // ContainerCommandRequestProto DeleteBlock and DeleteChunk requests
+      // are deprecated
+
+      //Update an existing container
+      Map<String, String> containerUpdate = new HashMap<String, String>();
+      containerUpdate.put("container_updated_key", "container_updated_value");
+      updateRequest1 = ContainerTestHelper.getUpdateContainerRequest(
+          testContainerID, containerUpdate);
+      updateResponse1 = client.sendCommand(updateRequest1);
+      assertNotNull(updateResponse1);
+      assertEquals(ContainerProtos.Result.CONTAINER_MISSING, 
updateResponse1.getResult());
+
+      //Update an non-existing container
+      long nonExistingContinerID =
+          ContainerTestHelper.getTestContainerID();
+      updateRequest2 = ContainerTestHelper.getUpdateContainerRequest(
+          nonExistingContinerID, containerUpdate);
+      updateResponse2 = client.sendCommand(updateRequest2);
+      assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND,
+          updateResponse2.getResult());
+
+      // Restarting again & checking if the container is still not present on 
disk and marked as missing, this is to
+      // ensure the previous write request didn't inadvertently create the 
container data.
+      cluster.restartHddsDatanode(datanodeDetails, false);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
cluster.getHddsDatanode(datanodeDetails).getDatanodeStateMachine()
+              .getContainer().getContainerSet()
+              .getMissingContainerSet().contains(testContainerID);
+        } catch (IOException e) {
+          return false;
+        }
+      }, 1000, 30000);
+      // Create Recovering Container
+      request  = 
ContainerTestHelper.getCreateContainerRequest(testContainerID, pipeline,
+          ContainerProtos.ContainerDataProto.State.RECOVERING);
+      response = client.sendCommand(request);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+      //write chunk on recovering container
+      response = client.sendCommand(writeChunkRequest);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+      //write chunk on recovering container
+      response = client.sendCommand(putBlockRequest);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+      //Get block on the recovering container should succeed now.
+      request = ContainerTestHelper.getBlockRequest(pipeline, 
putBlockRequest.getPutBlock());
+      response = client.sendCommand(request);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
   public static void runTestOzoneContainerViaDataNode(
       long testContainerID, XceiverClientSpi client) throws Exception {
     ContainerProtos.ContainerCommandRequestProto
@@ -504,10 +660,14 @@ public class TestOzoneContainer {
       MiniOzoneCluster cluster) {
     Pipeline pipeline = cluster.getStorageContainerManager()
         .getPipelineManager().getPipelines().iterator().next();
+    return createClientForTesting(pipeline, cluster);
+  }
+
+  private static XceiverClientGrpc createClientForTesting(Pipeline pipeline, 
MiniOzoneCluster cluster) {
     return new XceiverClientGrpc(pipeline, cluster.getConf());
   }
 
-  public static void createContainerForTesting(XceiverClientSpi client,
+  public static ContainerProtos.ContainerCommandResponseProto 
createContainerForTesting(XceiverClientSpi client,
       long containerID) throws Exception {
     // Create container
     ContainerProtos.ContainerCommandRequestProto request =
@@ -516,6 +676,7 @@ public class TestOzoneContainer {
     ContainerProtos.ContainerCommandResponseProto response =
         client.sendCommand(request);
     assertNotNull(response);
+    return response;
   }
 
   public static ContainerProtos.ContainerCommandRequestProto
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBDefinitionFactory.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBDefinitionFactory.java
index 87482cb549..ca79aa41fa 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBDefinitionFactory.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBDefinitionFactory.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.utils.db.DBDefinition;
 import org.apache.hadoop.ozone.OzoneConsts;
+import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerDBDefinition;
 import 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaOneDBDefinition;
 import 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
 import 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaTwoDBDefinition;
@@ -56,7 +57,8 @@ public final class DBDefinitionFactory {
 
   static {
     final Map<String, DBDefinition> map = new HashMap<>();
-    Arrays.asList(SCMDBDefinition.get(), OMDBDefinition.get(), 
ReconSCMDBDefinition.get())
+    Arrays.asList(SCMDBDefinition.get(), OMDBDefinition.get(), 
ReconSCMDBDefinition.get(),
+                  WitnessedContainerDBDefinition.get())
         .forEach(dbDefinition -> map.put(dbDefinition.getName(), 
dbDefinition));
     DB_MAP = Collections.unmodifiableMap(map);
   }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index 5592926bf8..47260d62f7 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -115,7 +115,7 @@ public class ContainerCommands implements Callable<Void>, 
SubcommandWithParent {
   public void loadContainersFromVolumes() throws IOException {
     OzoneConfiguration conf = parent.getOzoneConf();
 
-    ContainerSet containerSet = new ContainerSet(1000);
+    ContainerSet containerSet = new ContainerSet(null, 1000, true);
 
     ContainerMetrics metrics = ContainerMetrics.create(conf);
 
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 393c7e599c..656251424b 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -34,6 +34,8 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStore;
+import 
org.apache.hadoop.ozone.container.metadata.WitnessedContainerMetadataStoreImpl;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
@@ -82,11 +84,22 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
   private ContainerReplicator replicator;
 
   private Timer timer;
+  private WitnessedContainerMetadataStore witnessedContainerMetadataStore;
 
   private List<ReplicationTask> replicationTasks;
 
   @Override
   public Void call() throws Exception {
+    try {
+      return replicate();
+    } finally {
+      if (witnessedContainerMetadataStore != null) {
+        witnessedContainerMetadataStore.close();
+      }
+    }
+  }
+
+  public Void replicate() throws Exception {
 
     OzoneConfiguration conf = createOzoneConfiguration();
 
@@ -173,8 +186,10 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
     if (fakeDatanodeUuid.isEmpty()) {
       fakeDatanodeUuid = UUID.randomUUID().toString();
     }
-
-    ContainerSet containerSet = new ContainerSet(1000);
+    WitnessedContainerMetadataStore referenceCountedDS =
+        WitnessedContainerMetadataStoreImpl.get(conf);
+    this.witnessedContainerMetadataStore = referenceCountedDS;
+    ContainerSet containerSet = new 
ContainerSet(referenceCountedDS.getContainerIdsTable(), 1000);
 
     ContainerMetrics metrics = ContainerMetrics.create(conf);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to