This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 07fa775  HDDS-6234. Repair containers affected by incorrect used bytes 
and block count. (#3042)
07fa775 is described below

commit 07fa7759ce2955d1d1a1838b6120af238339ff1a
Author: Ethan Rose <[email protected]>
AuthorDate: Thu Feb 17 09:42:27 2022 -0800

    HDDS-6234. Repair containers affected by incorrect used bytes and block 
count. (#3042)
---
 .../common/interfaces/ContainerInspector.java      |  72 ++++
 .../common/utils/ContainerInspectorUtil.java       |  87 ++++
 .../KeyValueContainerMetadataInspector.java        | 463 +++++++++++++++++++++
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  10 +-
 .../ozone/container/ozoneimpl/ContainerReader.java |   6 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   9 +
 .../keyvalue/TestKeyValueContainerCheck.java       | 158 +------
 ...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++-----
 .../TestKeyValueContainerMetadataInspector.java    | 396 ++++++++++++++++++
 .../org/apache/ozone/test/GenericTestUtils.java    |  18 +-
 10 files changed, 1103 insertions(+), 276 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerInspector.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerInspector.java
new file mode 100644
index 0000000..5c6cab2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerInspector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+
+/**
+ * A ContainerInspector is tool used to log information about all
+ * containers as they are being processed during datanode startup. It could
+ * also be used to repair containers if necessary.
+ *
+ * These are primarily debug/developer utilities that will slow down datanode
+ * startup and are only meant to be run as needed.
+ */
+public interface ContainerInspector {
+  /**
+   * Loads necessary configurations to determine how/if to run the inspector
+   * when the process method is called.
+   *
+   * @return true if the inspector will operate when process is called. False
+   * otherwise.
+   */
+  boolean load();
+
+  /**
+   * Removes configurations to run the inspector, so that the process method
+   * becomes a no-op.
+   */
+  void unload();
+
+  /**
+   * Determines whether the inspector will be modifying containers as part of
+   * the process method.
+   *
+   * @return true if the inspector will only read the container, false if it
+   * will be making modifications/repairs.
+   */
+  boolean isReadOnly();
+
+  /**
+   * Operates on the container as the inspector is configured. This may
+   * involve logging information or fixing errors.
+   *
+   * Multiple containers may be processed in parallel by calling this method
+   * on the same inspector instance, but only one process call will be invoked
+   * per container at a time. Implementations must ensure that:
+   * 1. Information they log is batched so that log output from other
+   * inspectors working on other containers is not interleaved.
+   * 2. Multiple process calls to the same inspector instance with different
+   * containers are thread safe.
+   *
+   * @param data Container data for the container to process.
+   * @param store The metadata store for this container.
+   */
+  void process(ContainerData data, DatanodeStore store);
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerInspectorUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerInspectorUtil.java
new file mode 100644
index 0000000..9c900bc
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerInspectorUtil.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.utils;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerInspector;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerMetadataInspector;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+
+/**
+ * Utility class to manage container inspectors. New inspectors can be added
+ * here to have them loaded and process containers on startup.
+ */
+public final class ContainerInspectorUtil {
+
+  private static final EnumMap<ContainerProtos.ContainerType,
+          List<ContainerInspector>> INSPECTORS =
+      new EnumMap<>(ContainerProtos.ContainerType.class);
+
+  static {
+    for (ContainerProtos.ContainerType type:
+        ContainerProtos.ContainerType.values()) {
+      INSPECTORS.put(type, new ArrayList<>());
+    }
+
+    // If new inspectors need to be added, put them here mapped by the type
+    // of containers they can operate on.
+    INSPECTORS.get(ContainerProtos.ContainerType.KeyValueContainer)
+        .add(new KeyValueContainerMetadataInspector());
+  }
+
+  private ContainerInspectorUtil() { }
+
+  public static void load() {
+    for (List<ContainerInspector> inspectors: INSPECTORS.values()) {
+      for (ContainerInspector inspector: inspectors) {
+        inspector.load();
+      }
+    }
+  }
+
+  public static void unload() {
+    for (List<ContainerInspector> inspectors: INSPECTORS.values()) {
+      for (ContainerInspector inspector: inspectors) {
+        inspector.unload();
+      }
+    }
+  }
+
+  public static boolean isReadOnly(ContainerProtos.ContainerType type) {
+    boolean readOnly = true;
+    for (ContainerInspector inspector: INSPECTORS.get(type)) {
+      if (!inspector.isReadOnly()) {
+        readOnly = false;
+        break;
+      }
+    }
+    return readOnly;
+  }
+
+  public static void process(ContainerData data, DatanodeStore store) {
+    for (ContainerInspector inspector:
+        INSPECTORS.get(data.getContainerType())) {
+      inspector.process(data, store);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
new file mode 100644
index 0000000..614b63d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerInspector;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Stream;
+
+/**
+ * Container inspector for key value container metadata. It is capable of
+ * logging metadata information about a container, and repairing the metadata
+ * database values of #BLOCKCOUNT and #BYTESUSED.
+ *
+ * To enable this inspector in inspect mode, pass the java system property
+ * -Dozone.datanode.container.metadata.inspector=inspect on datanode startup.
+ * This will cause the inspector to log metadata information about all
+ * containers on datanode startup whose aggregate values of #BLOCKCOUNT and
+ * #BYTESUSED do not match the sum of their parts in the database at the
+ * ERROR level, and information about correct containers at the TRACE level.
+ * Changing the `inspect` argument to `repair` will update these aggregate
+ * values to match the database.
+ *
+ * When run, the inspector will output json to the logger named in the
+ * {@link KeyValueContainerMetadataInspector#REPORT_LOG} variable. The log4j
+ * configuration can be modified to send this output to a separate file
+ * without log information prefixes interfering with the json. For example:
+ *
+ * log4j.logger.ContainerMetadataInspectorReport=INFO,inspectorAppender
+ * log4j.appender.inspectorAppender=org.apache.log4j.FileAppender
+ * log4j.appender.inspectorAppender.File=${hadoop.log.dir}/\
+ * containerMetadataInspector.log
+ * log4j.appender.inspectorAppender.layout=org.apache.log4j.PatternLayout
+ */
+public class KeyValueContainerMetadataInspector implements ContainerInspector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyValueContainerMetadataInspector.class);
+  public static final Logger REPORT_LOG = LoggerFactory.getLogger(
+      "ContainerMetadataInspectorReport");
+
+  /**
+   * The mode to run the inspector in.
+   */
+  public enum Mode {
+    REPAIR("repair"),
+    INSPECT("inspect"),
+    OFF("off");
+
+    private final String name;
+
+    Mode(String name) {
+      this.name = name;
+    }
+
+    public String toString() {
+      return name;
+    }
+  }
+
+  public static final String SYSTEM_PROPERTY = "ozone.datanode.container" +
+      ".metadata.inspector";
+
+  private Mode mode;
+
+  public KeyValueContainerMetadataInspector() {
+    mode = Mode.OFF;
+  }
+
+  /**
+   * Validate configuration here so that an invalid config value is only
+   * logged once, and not once per container.
+   */
+  @Override
+  public boolean load() {
+    String propertyValue = System.getProperty(SYSTEM_PROPERTY);
+    boolean propertyPresent =
+        (propertyValue != null && !propertyValue.isEmpty());
+    boolean propertyValid = false;
+
+    if (propertyPresent) {
+      if (propertyValue.equals(Mode.REPAIR.toString())) {
+        mode = Mode.REPAIR;
+        propertyValid = true;
+      } else if (propertyValue.equals(Mode.INSPECT.toString())) {
+        mode = Mode.INSPECT;
+        propertyValid = true;
+      }
+
+      if (propertyValid) {
+        LOG.info("Container metadata inspector enabled in {} mode. Report" +
+            "will be output to the {} log.", mode, REPORT_LOG.getName());
+      } else {
+        mode = Mode.OFF;
+        LOG.error("{} system property specified with invalid mode {}. " +
+                "Valid options are {} and {}. Container metadata inspection " +
+                "will not be run.", SYSTEM_PROPERTY, propertyValue,
+            Mode.REPAIR, Mode.INSPECT);
+      }
+    } else {
+      mode = Mode.OFF;
+    }
+
+    return propertyPresent && propertyValid;
+  }
+
+  @Override
+  public void unload() {
+    mode = Mode.OFF;
+  }
+
+  @Override
+  public boolean isReadOnly() {
+    return mode != Mode.REPAIR;
+  }
+
+  @Override
+  public void process(ContainerData containerData, DatanodeStore store) {
+    // If the system property to process container metadata was not
+    // specified, or the inspector is unloaded, this method is a no-op.
+    if (mode == Mode.OFF) {
+      return;
+    }
+
+    KeyValueContainerData kvData = null;
+    if (containerData instanceof KeyValueContainerData) {
+      kvData = (KeyValueContainerData) containerData;
+    } else {
+      LOG.error("This inspector only works on KeyValueContainers. Inspection " 
+
+          "will not be run for container {}", containerData.getContainerID());
+      return;
+    }
+
+    JsonObject containerJson = inspectContainer(kvData, store);
+    boolean correct = checkAndRepair(containerJson, kvData, store);
+
+    Gson gson = new GsonBuilder()
+        .setPrettyPrinting()
+        .serializeNulls()
+        .create();
+    String jsonReport = gson.toJson(containerJson);
+    if (correct) {
+      REPORT_LOG.trace(jsonReport);
+    } else {
+      REPORT_LOG.error(jsonReport);
+    }
+  }
+
+  private JsonObject inspectContainer(KeyValueContainerData containerData,
+      DatanodeStore store) {
+
+    JsonObject containerJson = new JsonObject();
+
+    try {
+      // Build top level container properties.
+      containerJson.addProperty("containerID", containerData.getContainerID());
+      String schemaVersion = containerData.getSchemaVersion();
+      containerJson.addProperty("schemaVersion", schemaVersion);
+      containerJson.addProperty("containerState",
+          containerData.getState().toString());
+      containerJson.addProperty("currentDatanodeID",
+          containerData.getVolume().getDatanodeUuid());
+      containerJson.addProperty("originDatanodeID",
+          containerData.getOriginNodeId());
+
+      // Build DB metadata values.
+      Table<String, Long> metadataTable = store.getMetadataTable();
+      JsonObject dBMetadata = getDBMetadataJson(metadataTable);
+      containerJson.add("dBMetadata", dBMetadata);
+
+      // Build aggregate values.
+      JsonObject aggregates = getAggregateValues(store, schemaVersion);
+      containerJson.add("aggregates", aggregates);
+
+      // Build info about chunks directory.
+      JsonObject chunksDirectory =
+          getChunksDirectoryJson(new File(containerData.getChunksPath()));
+      containerJson.add("chunksDirectory", chunksDirectory);
+    } catch (IOException ex) {
+      LOG.error("Inspecting container {} failed",
+          containerData.getContainerID(), ex);
+    }
+
+    return containerJson;
+  }
+
+  private JsonObject getDBMetadataJson(Table<String, Long> metadataTable)
+      throws IOException {
+    JsonObject dBMetadata = new JsonObject();
+
+    dBMetadata.addProperty(OzoneConsts.BLOCK_COUNT,
+        metadataTable.get(OzoneConsts.BLOCK_COUNT));
+    dBMetadata.addProperty(OzoneConsts.CONTAINER_BYTES_USED,
+        metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED));
+    dBMetadata.addProperty(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+        metadataTable.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT));
+    dBMetadata.addProperty(OzoneConsts.DELETE_TRANSACTION_KEY,
+        metadataTable.get(OzoneConsts.DELETE_TRANSACTION_KEY));
+    dBMetadata.addProperty(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID,
+        metadataTable.get(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID));
+
+    return dBMetadata;
+  }
+
+  private JsonObject getAggregateValues(DatanodeStore store,
+      String schemaVersion) throws IOException {
+    JsonObject aggregates = new JsonObject();
+
+    long usedBytesTotal = 0;
+    long blockCountTotal = 0;
+    long pendingDeleteBlockCountTotal = 0;
+    // Count normal blocks.
+    try (BlockIterator<BlockData> blockIter =
+             store.getBlockIterator(
+                 MetadataKeyFilters.getUnprefixedKeyFilter())) {
+
+      while (blockIter.hasNext()) {
+        blockCountTotal++;
+        usedBytesTotal += getBlockLength(blockIter.nextBlock());
+      }
+    }
+
+    // Count pending delete blocks.
+    if (schemaVersion.equals(OzoneConsts.SCHEMA_V1)) {
+      try (BlockIterator<BlockData> blockIter =
+               store.getBlockIterator(
+                   MetadataKeyFilters.getDeletingKeyFilter())) {
+
+        while (blockIter.hasNext()) {
+          blockCountTotal++;
+          pendingDeleteBlockCountTotal++;
+          usedBytesTotal += getBlockLength(blockIter.nextBlock());
+        }
+      }
+    } else if (schemaVersion.equals(OzoneConsts.SCHEMA_V2)) {
+      DatanodeStoreSchemaTwoImpl schemaTwoStore =
+          (DatanodeStoreSchemaTwoImpl) store;
+      pendingDeleteBlockCountTotal =
+          countPendingDeletesSchemaV2(schemaTwoStore);
+    } else {
+      throw new IOException("Failed to process deleted blocks for unknown " +
+              "container schema " + schemaVersion);
+    }
+
+    aggregates.addProperty("blockCount", blockCountTotal);
+    aggregates.addProperty("usedBytes", usedBytesTotal);
+    aggregates.addProperty("pendingDeleteBlocks",
+        pendingDeleteBlockCountTotal);
+
+    return aggregates;
+  }
+
+  private JsonObject getChunksDirectoryJson(File chunksDir) throws IOException 
{
+    JsonObject chunksDirectory = new JsonObject();
+
+    chunksDirectory.addProperty("path", chunksDir.getAbsolutePath());
+    boolean chunksDirPresent = FileUtils.isDirectory(chunksDir);
+    chunksDirectory.addProperty("present", chunksDirPresent);
+
+    long fileCount = 0;
+    if (chunksDirPresent) {
+      try (Stream<Path> stream = Files.list(chunksDir.toPath())) {
+        fileCount = stream.count();
+      }
+    }
+    chunksDirectory.addProperty("fileCount", fileCount);
+
+    return chunksDirectory;
+  }
+
+  private boolean checkAndRepair(JsonObject parent, ContainerData 
containerData,
+      DatanodeStore store) {
+    JsonArray errors = new JsonArray();
+    boolean passed = true;
+
+    Table<String, Long> metadataTable = store.getMetadataTable();
+
+    // Check and repair block count.
+    JsonElement blockCountDB = parent.getAsJsonObject("dBMetadata")
+        .get(OzoneConsts.BLOCK_COUNT);
+
+    JsonElement blockCountAggregate = parent.getAsJsonObject("aggregates")
+        .get("blockCount");
+
+    // If block count is absent from the DB, it is only an error if there are
+    // a non-zero amount of block keys in the DB.
+    long blockCountDBLong = 0;
+    if (!blockCountDB.isJsonNull()) {
+      blockCountDBLong = blockCountDB.getAsLong();
+    }
+
+    if (blockCountDBLong != blockCountAggregate.getAsLong()) {
+      passed = false;
+
+      BooleanSupplier keyRepairAction = () -> {
+        boolean repaired = false;
+        try {
+          metadataTable.put(OzoneConsts.BLOCK_COUNT,
+              blockCountAggregate.getAsLong());
+          repaired = true;
+        } catch (IOException ex) {
+          LOG.error("Error repairing block count for container {}.",
+              containerData.getContainerID(), ex);
+        }
+        return repaired;
+      };
+
+      JsonObject blockCountError = buildErrorAndRepair("dBMetadata." +
+              OzoneConsts.BLOCK_COUNT, blockCountAggregate, blockCountDB,
+          keyRepairAction);
+      errors.add(blockCountError);
+    }
+
+    // Check and repair used bytes.
+    JsonElement usedBytesDB = parent.getAsJsonObject("dBMetadata")
+        .get(OzoneConsts.CONTAINER_BYTES_USED);
+    JsonElement usedBytesAggregate = parent.getAsJsonObject("aggregates")
+        .get("usedBytes");
+
+    // If used bytes is absent from the DB, it is only an error if there is
+    // a non-zero aggregate of used bytes among the block keys.
+    long usedBytesDBLong = 0;
+    if (!usedBytesDB.isJsonNull()) {
+      usedBytesDBLong = usedBytesDB.getAsLong();
+    }
+
+    if (usedBytesDBLong != usedBytesAggregate.getAsLong()) {
+      passed = false;
+
+      BooleanSupplier keyRepairAction = () -> {
+        boolean repaired = false;
+        try {
+          metadataTable.put(OzoneConsts.CONTAINER_BYTES_USED,
+              usedBytesAggregate.getAsLong());
+          repaired = true;
+        } catch (IOException ex) {
+          LOG.error("Error repairing used bytes for container {}.",
+              containerData.getContainerID(), ex);
+        }
+        return repaired;
+      };
+
+      JsonObject usedBytesError = buildErrorAndRepair("dBMetadata." +
+              OzoneConsts.CONTAINER_BYTES_USED, usedBytesAggregate, 
usedBytesDB,
+          keyRepairAction);
+      errors.add(usedBytesError);
+    }
+
+    // check and repair chunks dir.
+    JsonElement chunksDirPresent = parent.getAsJsonObject("chunksDirectory")
+        .get("present");
+    if (!chunksDirPresent.getAsBoolean()) {
+      passed = false;
+
+      BooleanSupplier dirRepairAction = () -> {
+        boolean repaired = false;
+        try {
+          File chunksDir = new File(containerData.getChunksPath());
+          Files.createDirectories(chunksDir.toPath());
+          repaired = true;
+        } catch (IOException ex) {
+          LOG.error("Error recreating empty chunks directory for container 
{}.",
+              containerData.getContainerID(), ex);
+        }
+        return repaired;
+      };
+
+      JsonObject chunksDirError = 
buildErrorAndRepair("chunksDirectory.present",
+          new JsonPrimitive(true), chunksDirPresent, dirRepairAction);
+      errors.add(chunksDirError);
+    }
+
+    parent.addProperty("correct", passed);
+    parent.add("errors", errors);
+    return passed;
+  }
+
+  private JsonObject buildErrorAndRepair(String property, JsonElement expected,
+      JsonElement actual, BooleanSupplier repairAction) {
+    JsonObject error = new JsonObject();
+    error.addProperty("property", property);
+    error.add("expected", expected);
+    error.add("actual", actual);
+
+    boolean repaired = false;
+    if (mode == Mode.REPAIR) {
+      repaired = repairAction.getAsBoolean();
+    }
+    error.addProperty("repaired", repaired);
+
+    return error;
+  }
+
+  private long countPendingDeletesSchemaV2(DatanodeStoreSchemaTwoImpl
+      schemaTwoStore) throws IOException {
+    long pendingDeleteBlockCountTotal = 0;
+    Table<Long, DeletedBlocksTransaction> delTxTable =
+        schemaTwoStore.getDeleteTransactionTable();
+    try (TableIterator<Long, ? extends Table.KeyValue<Long,
+        DeletedBlocksTransaction>> iterator = delTxTable.iterator()) {
+      while (iterator.hasNext()) {
+        DeletedBlocksTransaction txn = iterator.next().getValue();
+        // In schema 2, pending delete blocks are stored in the
+        // transaction object. Since the actual blocks still exist in the
+        // block data table with no prefix, they have already been
+        // counted towards bytes used and total block count above.
+        pendingDeleteBlockCountTotal += txn.getLocalIDList().size();
+      }
+    }
+
+    return pendingDeleteBlockCountTotal;
+  }
+
+  private static long getBlockLength(BlockData block) {
+    long blockLen = 0;
+    List<ContainerProtos.ChunkInfo> chunkInfoList = block.getChunks();
+
+    for (ContainerProtos.ChunkInfo chunk : chunkInfoList) {
+      blockLen += chunk.getLen();
+    }
+
+    return blockLen;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index f984b20..72cc8c3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import com.google.common.base.Preconditions;
@@ -189,8 +190,10 @@ public final class KeyValueContainerUtil {
     DatanodeStore store = null;
     try {
       try {
+        boolean readOnly = ContainerInspectorUtil.isReadOnly(
+            ContainerProtos.ContainerType.KeyValueContainer);
         store = BlockUtils.getUncachedDatanodeStore(
-            kvContainerData, config, true);
+            kvContainerData, config, readOnly);
       } catch (IOException e) {
         // If an exception is thrown, then it may indicate the RocksDB is
         // already open in the container cache. As this code is only executed 
at
@@ -254,6 +257,11 @@ public final class KeyValueContainerUtil {
       if (!isBlockMetadataSet) {
         initializeUsedBytesAndBlockCount(store, kvContainerData);
       }
+
+      // Run advanced container inspection/repair operations if specified on
+      // startup. If this method is called but not as a part of startup,
+      // The inspectors will be unloaded and this will be a no-op.
+      ContainerInspectorUtil.process(kvContainerData, store);
     } finally {
       if (cachedDB != null) {
         // If we get a cached instance, calling close simply decrements the
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
index 44aa92c..2a88a2f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -190,9 +190,9 @@ public class ContainerReader implements Runnable {
   }
 
   /**
-   * verify ContainerData loaded from disk and fix-up stale members.
-   * Specifically blockCommitSequenceId, delete related metadata
-   * and bytesUsed
+   * Verify ContainerData loaded from disk and fix-up stale members.
+   * Specifically the in memory values of blockCommitSequenceId, delete related
+   * metadata, bytesUsed and block count.
    * @param containerData
    * @throws IOException
    */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 5eaa01e..4a6dece 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -52,6 +52,7 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
@@ -221,6 +222,10 @@ public class OzoneContainer {
     ArrayList<Thread> volumeThreads = new ArrayList<>();
     long startTime = System.currentTimeMillis();
 
+    // Load container inspectors that may be triggered at startup based on
+    // system properties set. These can inspect and possibly repair
+    // containers as we iterate them here.
+    ContainerInspectorUtil.load();
     //TODO: diskchecker should be run before this, to see how disks are.
     // And also handle disk failure tolerance need to be added
     while (volumeSetIterator.hasNext()) {
@@ -240,6 +245,10 @@ public class OzoneContainer {
       Thread.currentThread().interrupt();
     }
 
+    // After all containers have been processed, turn off container
+    // inspectors so they are not hit during normal datanode execution.
+    ContainerInspectorUtil.unload();
+
     LOG.info("Build ContainerSet costs {}s",
         (System.currentTimeMillis() - startTime) / 1000);
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index d50a091..99812f3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -18,50 +18,24 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
-import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
-import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
@@ -69,46 +43,13 @@ import static org.junit.Assert.assertFalse;
 /**
  * Basic sanity test for the KeyValueContainerCheck class.
  */
-@RunWith(Parameterized.class) public class TestKeyValueContainerCheck {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestKeyValueContainerCheck.class);
-
-  private final ContainerLayoutTestInfo chunkManagerTestInfo;
-  private KeyValueContainer container;
-  private KeyValueContainerData containerData;
-  private MutableVolumeSet volumeSet;
-  private OzoneConfiguration conf;
-  private File testRoot;
-  private ChunkManager chunkManager;
+@RunWith(Parameterized.class)
+public class TestKeyValueContainerCheck
+    extends TestKeyValueContainerIntegrityChecks {
 
-  public TestKeyValueContainerCheck(
-      ContainerLayoutTestInfo chunkManagerTestInfo) {
-    this.chunkManagerTestInfo = chunkManagerTestInfo;
-  }
-
-  @Parameterized.Parameters public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] {
-        {ContainerLayoutTestInfo.FILE_PER_CHUNK},
-        {ContainerLayoutTestInfo.FILE_PER_BLOCK}
-    });
-  }
-
-  @Before public void setUp() throws Exception {
-    LOG.info("Testing  layout:{}", chunkManagerTestInfo.getLayout());
-    this.testRoot = GenericTestUtils.getRandomizedTestDir();
-    conf = new OzoneConfiguration();
-    conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
-    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
-    chunkManagerTestInfo.updateConfig(conf);
-    volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
-        StorageVolume.VolumeType.DATA_VOLUME, null);
-    chunkManager = chunkManagerTestInfo.createChunkManager(true, null);
-  }
-
-  @After public void teardown() {
-    volumeSet.shutdown();
-    FileUtil.fullyDelete(testRoot);
+  public TestKeyValueContainerCheck(ContainerLayoutTestInfo
+      containerLayoutTestInfo) {
+    super(containerLayoutTestInfo);
   }
 
   /**
@@ -119,13 +60,14 @@ import static org.junit.Assert.assertFalse;
     long containerID = 101;
     int deletedBlocks = 1;
     int normalBlocks = 3;
-    int chunksPerBlock = 4;
+    OzoneConfiguration conf = getConf();
     ContainerScrubberConfiguration c = conf.getObject(
         ContainerScrubberConfiguration.class);
 
     // test Closed Container
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks,
-        chunksPerBlock);
+    KeyValueContainer container = createContainerWithBlocks(containerID,
+        normalBlocks, deletedBlocks);
+    KeyValueContainerData containerData = container.getContainerData();
 
     KeyValueContainerCheck kvCheck =
         new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
@@ -151,13 +93,14 @@ import static org.junit.Assert.assertFalse;
     long containerID = 102;
     int deletedBlocks = 1;
     int normalBlocks = 3;
-    int chunksPerBlock = 4;
+    OzoneConfiguration conf = getConf();
     ContainerScrubberConfiguration sc = conf.getObject(
         ContainerScrubberConfiguration.class);
 
     // test Closed Container
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks,
-        chunksPerBlock);
+    KeyValueContainer container = createContainerWithBlocks(containerID,
+        normalBlocks, deletedBlocks);
+    KeyValueContainerData containerData = container.getContainerData();
 
     container.close();
 
@@ -178,7 +121,7 @@ import static org.junit.Assert.assertFalse;
       ContainerProtos.ChunkInfo c = block.getChunks().get(0);
       BlockID blockID = block.getBlockID();
       ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(c);
-      File chunkFile = chunkManagerTestInfo.getLayout()
+      File chunkFile = getChunkLayout()
           .getChunkFile(containerData, blockID, chunkInfo);
       long length = chunkFile.length();
       assertTrue(length > 0);
@@ -198,75 +141,4 @@ import static org.junit.Assert.assertFalse;
             sc.getBandwidthPerVolume()), null);
     assertFalse(valid);
   }
-
-  /**
-   * Creates a container with normal and deleted blocks.
-   * First it will insert normal blocks, and then it will insert
-   * deleted blocks.
-   */
-  private void createContainerWithBlocks(long containerId, int normalBlocks,
-      int deletedBlocks, int chunksPerBlock) throws Exception {
-    String strBlock = "block";
-    String strChunk = "-chunkFile";
-    long totalBlocks = normalBlocks + deletedBlocks;
-    int unitLen = 1024;
-    int chunkLen = 3 * unitLen;
-    int bytesPerChecksum = 2 * unitLen;
-    Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
-        bytesPerChecksum);
-    byte[] chunkData = RandomStringUtils.randomAscii(chunkLen).getBytes(UTF_8);
-    ChecksumData checksumData = checksum.computeChecksum(chunkData);
-    DispatcherContext writeStage = new DispatcherContext.Builder()
-        .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
-        .build();
-    DispatcherContext commitStage = new DispatcherContext.Builder()
-        .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
-        .build();
-
-    containerData = new KeyValueContainerData(containerId,
-        chunkManagerTestInfo.getLayout(),
-        (long) chunksPerBlock * chunkLen * totalBlocks,
-        UUID.randomUUID().toString(), UUID.randomUUID().toString());
-    container = new KeyValueContainer(containerData, conf);
-    container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
-        UUID.randomUUID().toString());
-    try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
-        conf)) {
-      assertNotNull(containerData.getChunksPath());
-      File chunksPath = new File(containerData.getChunksPath());
-      chunkManagerTestInfo.validateFileCount(chunksPath, 0, 0);
-
-      List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
-      for (int i = 0; i < totalBlocks; i++) {
-        BlockID blockID = new BlockID(containerId, i);
-        BlockData blockData = new BlockData(blockID);
-
-        chunkList.clear();
-        for (long chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
-          String chunkName = strBlock + i + strChunk + chunkCount;
-          long offset = chunkCount * chunkLen;
-          ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
-          info.setChecksumData(checksumData);
-          chunkList.add(info.getProtoBufMessage());
-          chunkManager.writeChunk(container, blockID, info,
-              ByteBuffer.wrap(chunkData), writeStage);
-          chunkManager.writeChunk(container, blockID, info,
-              ByteBuffer.wrap(chunkData), commitStage);
-        }
-        blockData.setChunks(chunkList);
-
-        // normal key
-        String key = Long.toString(blockID.getLocalID());
-        if (i >= normalBlocks) {
-          // deleted key
-          key = OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
-        }
-        metadataStore.getStore().getBlockDataTable().put(key, blockData);
-      }
-
-      chunkManagerTestInfo.validateFileCount(chunksPath, totalBlocks,
-          totalBlocks * chunksPerBlock);
-    }
-  }
-
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
similarity index 52%
copy from 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
copy to 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
index d50a091..63bf5d6 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerIntegrityChecks.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.ozone.container.keyvalue;
 
 import org.apache.commons.lang3.RandomStringUtils;
@@ -23,34 +22,28 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
-import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
-import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScrubberConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.RandomAccessFile;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.nio.ByteBuffer;
@@ -60,31 +53,29 @@ import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
 
 /**
- * Basic sanity test for the KeyValueContainerCheck class.
+ * Base class for tests identifying issues with key value container contents.
  */
-@RunWith(Parameterized.class) public class TestKeyValueContainerCheck {
+public class TestKeyValueContainerIntegrityChecks {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestKeyValueContainerCheck.class);
+      LoggerFactory.getLogger(TestKeyValueContainerIntegrityChecks.class);
 
-  private final ContainerLayoutTestInfo chunkManagerTestInfo;
-  private KeyValueContainer container;
-  private KeyValueContainerData containerData;
+  private final ContainerLayoutTestInfo containerLayoutTestInfo;
   private MutableVolumeSet volumeSet;
   private OzoneConfiguration conf;
   private File testRoot;
   private ChunkManager chunkManager;
 
-  public TestKeyValueContainerCheck(
-      ContainerLayoutTestInfo chunkManagerTestInfo) {
-    this.chunkManagerTestInfo = chunkManagerTestInfo;
+  protected static final int UNIT_LEN = 1024;
+  protected static final int CHUNK_LEN = 3 * UNIT_LEN;
+  protected static final int CHUNKS_PER_BLOCK = 4;
+
+  public TestKeyValueContainerIntegrityChecks(ContainerLayoutTestInfo
+      containerLayoutTestInfo) {
+    this.containerLayoutTestInfo = containerLayoutTestInfo;
   }
 
   @Parameterized.Parameters public static Collection<Object[]> data() {
@@ -95,15 +86,15 @@ import static org.junit.Assert.assertFalse;
   }
 
   @Before public void setUp() throws Exception {
-    LOG.info("Testing  layout:{}", chunkManagerTestInfo.getLayout());
+    LOG.info("Testing  layout:{}", containerLayoutTestInfo.getLayout());
     this.testRoot = GenericTestUtils.getRandomizedTestDir();
     conf = new OzoneConfiguration();
     conf.set(HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
     conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testRoot.getAbsolutePath());
-    chunkManagerTestInfo.updateConfig(conf);
+    containerLayoutTestInfo.updateConfig(conf);
     volumeSet = new MutableVolumeSet(UUID.randomUUID().toString(), conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
-    chunkManager = chunkManagerTestInfo.createChunkManager(true, null);
+    chunkManager = containerLayoutTestInfo.createChunkManager(true, null);
   }
 
   @After public void teardown() {
@@ -111,110 +102,29 @@ import static org.junit.Assert.assertFalse;
     FileUtil.fullyDelete(testRoot);
   }
 
-  /**
-   * Sanity test, when there are no corruptions induced.
-   */
-  @Test
-  public void testKeyValueContainerCheckNoCorruption() throws Exception {
-    long containerID = 101;
-    int deletedBlocks = 1;
-    int normalBlocks = 3;
-    int chunksPerBlock = 4;
-    ContainerScrubberConfiguration c = conf.getObject(
-        ContainerScrubberConfiguration.class);
-
-    // test Closed Container
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks,
-        chunksPerBlock);
-
-    KeyValueContainerCheck kvCheck =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID);
-
-    // first run checks on a Open Container
-    boolean valid = kvCheck.fastCheck();
-    assertTrue(valid);
-
-    container.close();
-
-    // next run checks on a Closed Container
-    valid = kvCheck.fullCheck(new DataTransferThrottler(
-        c.getBandwidthPerVolume()), null);
-    assertTrue(valid);
+  protected ContainerLayoutVersion getChunkLayout() {
+    return containerLayoutTestInfo.getLayout();
   }
 
-  /**
-   * Sanity test, when there are corruptions induced.
-   */
-  @Test
-  public void testKeyValueContainerCheckCorruption() throws Exception {
-    long containerID = 102;
-    int deletedBlocks = 1;
-    int normalBlocks = 3;
-    int chunksPerBlock = 4;
-    ContainerScrubberConfiguration sc = conf.getObject(
-        ContainerScrubberConfiguration.class);
-
-    // test Closed Container
-    createContainerWithBlocks(containerID, normalBlocks, deletedBlocks,
-        chunksPerBlock);
-
-    container.close();
-
-    KeyValueContainerCheck kvCheck =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID);
-
-    File metaDir = new File(containerData.getMetadataPath());
-    File dbFile = KeyValueContainerLocationUtil
-        .getContainerDBFile(metaDir, containerID);
-    containerData.setDbFile(dbFile);
-    try (ReferenceCountedDB ignored =
-            BlockUtils.getDB(containerData, conf);
-        BlockIterator<BlockData> kvIter =
-                ignored.getStore().getBlockIterator()) {
-      BlockData block = kvIter.nextBlock();
-      assertFalse(block.getChunks().isEmpty());
-      ContainerProtos.ChunkInfo c = block.getChunks().get(0);
-      BlockID blockID = block.getBlockID();
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(c);
-      File chunkFile = chunkManagerTestInfo.getLayout()
-          .getChunkFile(containerData, blockID, chunkInfo);
-      long length = chunkFile.length();
-      assertTrue(length > 0);
-      // forcefully truncate the file to induce failure.
-      try (RandomAccessFile file = new RandomAccessFile(chunkFile, "rws")) {
-        file.setLength(length / 2);
-      }
-      assertEquals(length / 2, chunkFile.length());
-    }
-
-    // metadata check should pass.
-    boolean valid = kvCheck.fastCheck();
-    assertTrue(valid);
-
-    // checksum validation should fail.
-    valid = kvCheck.fullCheck(new DataTransferThrottler(
-            sc.getBandwidthPerVolume()), null);
-    assertFalse(valid);
+  protected OzoneConfiguration getConf() {
+    return conf;
   }
 
+
   /**
    * Creates a container with normal and deleted blocks.
    * First it will insert normal blocks, and then it will insert
    * deleted blocks.
    */
-  private void createContainerWithBlocks(long containerId, int normalBlocks,
-      int deletedBlocks, int chunksPerBlock) throws Exception {
+  protected KeyValueContainer createContainerWithBlocks(long containerId,
+      int normalBlocks, int deletedBlocks) throws Exception {
     String strBlock = "block";
     String strChunk = "-chunkFile";
     long totalBlocks = normalBlocks + deletedBlocks;
-    int unitLen = 1024;
-    int chunkLen = 3 * unitLen;
-    int bytesPerChecksum = 2 * unitLen;
+    int bytesPerChecksum = 2 * UNIT_LEN;
     Checksum checksum = new Checksum(ContainerProtos.ChecksumType.SHA256,
         bytesPerChecksum);
-    byte[] chunkData = RandomStringUtils.randomAscii(chunkLen).getBytes(UTF_8);
+    byte[] chunkData = 
RandomStringUtils.randomAscii(CHUNK_LEN).getBytes(UTF_8);
     ChecksumData checksumData = checksum.computeChecksum(chunkData);
     DispatcherContext writeStage = new DispatcherContext.Builder()
         .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
@@ -223,18 +133,18 @@ import static org.junit.Assert.assertFalse;
         .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
         .build();
 
-    containerData = new KeyValueContainerData(containerId,
-        chunkManagerTestInfo.getLayout(),
-        (long) chunksPerBlock * chunkLen * totalBlocks,
+    KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+        containerLayoutTestInfo.getLayout(),
+        (long) CHUNKS_PER_BLOCK * CHUNK_LEN * totalBlocks,
         UUID.randomUUID().toString(), UUID.randomUUID().toString());
-    container = new KeyValueContainer(containerData, conf);
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
     container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
         UUID.randomUUID().toString());
     try (ReferenceCountedDB metadataStore = BlockUtils.getDB(containerData,
         conf)) {
       assertNotNull(containerData.getChunksPath());
       File chunksPath = new File(containerData.getChunksPath());
-      chunkManagerTestInfo.validateFileCount(chunksPath, 0, 0);
+      containerLayoutTestInfo.validateFileCount(chunksPath, 0, 0);
 
       List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
       for (int i = 0; i < totalBlocks; i++) {
@@ -242,10 +152,10 @@ import static org.junit.Assert.assertFalse;
         BlockData blockData = new BlockData(blockID);
 
         chunkList.clear();
-        for (long chunkCount = 0; chunkCount < chunksPerBlock; chunkCount++) {
+        for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; chunkCount++) 
{
           String chunkName = strBlock + i + strChunk + chunkCount;
-          long offset = chunkCount * chunkLen;
-          ChunkInfo info = new ChunkInfo(chunkName, offset, chunkLen);
+          long offset = chunkCount * CHUNK_LEN;
+          ChunkInfo info = new ChunkInfo(chunkName, offset, CHUNK_LEN);
           info.setChecksumData(checksumData);
           chunkList.add(info.getProtoBufMessage());
           chunkManager.writeChunk(container, blockID, info,
@@ -264,9 +174,11 @@ import static org.junit.Assert.assertFalse;
         metadataStore.getStore().getBlockDataTable().put(key, blockData);
       }
 
-      chunkManagerTestInfo.validateFileCount(chunksPath, totalBlocks,
-          totalBlocks * chunksPerBlock);
+      containerLayoutTestInfo.validateFileCount(chunksPath, totalBlocks,
+          totalBlocks * CHUNKS_PER_BLOCK);
     }
+
+    return container;
   }
 
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
new file mode 100644
index 0000000..16e3f2d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerInspector;
+import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.log4j.PatternLayout;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+/**
+ * Tests for {@link KeyValueContainerMetadataInspector}.
+ */
+@RunWith(Parameterized.class)
+public class TestKeyValueContainerMetadataInspector
+    extends TestKeyValueContainerIntegrityChecks {
+  private static final long CONTAINER_ID = 102;
+
+  public TestKeyValueContainerMetadataInspector(ContainerLayoutTestInfo
+      containerLayoutTestInfo) {
+    super(containerLayoutTestInfo);
+  }
+
+  @Test
+  public void testRunDisabled() throws Exception {
+    // Create incorrect container.
+    KeyValueContainer container = createClosedContainer(3);
+    KeyValueContainerData containerData = container.getContainerData();
+    setDBBlockAndByteCounts(containerData, -2, -2);
+
+    // No system property set. Should not run.
+    System.clearProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY);
+    ContainerInspectorUtil.load();
+    Assert.assertNull(runInspectorAndGetReport(containerData));
+    ContainerInspectorUtil.unload();
+
+    // Unloaded. Should not run even with system property.
+    System.setProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY,
+        KeyValueContainerMetadataInspector.Mode.INSPECT.toString());
+    Assert.assertNull(runInspectorAndGetReport(containerData));
+
+    // Unloaded and no system property. Should not run.
+    System.clearProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY);
+    Assert.assertNull(runInspectorAndGetReport(containerData));
+  }
+
+  @Test
+  public void testSystemPropertyAndReadOnly() {
+    System.clearProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY);
+    ContainerInspector inspector = new KeyValueContainerMetadataInspector();
+    Assert.assertFalse(inspector.load());
+    Assert.assertTrue(inspector.isReadOnly());
+
+    // Inspect mode: valid argument and readonly.
+    System.setProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY,
+        KeyValueContainerMetadataInspector.Mode.INSPECT.toString());
+    inspector = new KeyValueContainerMetadataInspector();
+    Assert.assertTrue(inspector.load());
+    Assert.assertTrue(inspector.isReadOnly());
+
+    // Repair mode: valid argument and not readonly.
+    System.setProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY,
+        KeyValueContainerMetadataInspector.Mode.REPAIR.toString());
+    inspector = new KeyValueContainerMetadataInspector();
+    Assert.assertTrue(inspector.load());
+    Assert.assertFalse(inspector.isReadOnly());
+
+    // Bad argument: invalid argument and readonly.
+    System.setProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY,
+        "badvalue");
+    inspector = new KeyValueContainerMetadataInspector();
+    Assert.assertFalse(inspector.load());
+    Assert.assertTrue(inspector.isReadOnly());
+
+    // Clean slate for other tests.
+    System.clearProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY);
+  }
+
+  @Test
+  public void testMissingChunksDir() throws Exception {
+    // Create container with missing chunks dir.
+    // The metadata in the DB will not be set in this fake container.
+    KeyValueContainer container = createClosedContainer(0);
+    KeyValueContainerData containerData = container.getContainerData();
+    String chunksDirStr = containerData.getChunksPath();
+    File chunksDirFile = new File(chunksDirStr);
+    FileUtils.deleteDirectory(chunksDirFile);
+    Assert.assertFalse(chunksDirFile.exists());
+
+    // In inspect mode, missing chunks dir should be detected but not fixed.
+    JsonObject inspectJson = runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.INSPECT);
+    // The block count and used bytes should be null in this container, but
+    // because it has no block keys that should not be an error.
+    Assert.assertEquals(1,
+        inspectJson.getAsJsonArray("errors").size());
+    checkJsonErrorsReport(inspectJson, "chunksDirectory.present",
+        new JsonPrimitive(true), new JsonPrimitive(false), false);
+    Assert.assertFalse(chunksDirFile.exists());
+
+    // In repair mode, missing chunks dir should be detected and fixed.
+    JsonObject repairJson = runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.REPAIR);
+    Assert.assertEquals(1,
+        inspectJson.getAsJsonArray("errors").size());
+    checkJsonErrorsReport(repairJson, "chunksDirectory.present",
+        new JsonPrimitive(true), new JsonPrimitive(false), true);
+    Assert.assertTrue(chunksDirFile.exists());
+    Assert.assertTrue(chunksDirFile.isDirectory());
+  }
+
+  @Test
+  public void testIncorrectTotalsNoData() throws Exception {
+    int createBlocks = 0;
+    int setBlocks = -3;
+    int setBytes = -2;
+
+    KeyValueContainer container = createClosedContainer(createBlocks);
+    setDBBlockAndByteCounts(container.getContainerData(), setBlocks, setBytes);
+    inspectThenRepairOnIncorrectContainer(container.getContainerData(),
+        createBlocks, setBlocks, setBytes);
+  }
+
+  @Test
+  public void testIncorrectTotalsWithData() throws Exception {
+    int createBlocks = 3;
+    int setBlocks = 4;
+    int setBytes = -2;
+
+    // Make sure it runs on open containers too.
+    KeyValueContainer container = createOpenContainer(createBlocks);
+    setDBBlockAndByteCounts(container.getContainerData(), setBlocks, setBytes);
+    inspectThenRepairOnIncorrectContainer(container.getContainerData(),
+        createBlocks, setBlocks, setBytes);
+  }
+
+  @Test
+  public void testCorrectTotalsNoData() throws Exception {
+    int createBlocks = 0;
+    int setBytes = 0;
+
+    KeyValueContainer container = createClosedContainer(createBlocks);
+    setDBBlockAndByteCounts(container.getContainerData(), createBlocks,
+        setBytes);
+    inspectThenRepairOnCorrectContainer(container.getContainerData());
+  }
+
+  @Test
+  public void testCorrectTotalsWithData() throws Exception {
+    int createBlocks = 3;
+    int setBytes = CHUNK_LEN * CHUNKS_PER_BLOCK * createBlocks;
+
+    KeyValueContainer container = createClosedContainer(createBlocks);
+    setDBBlockAndByteCounts(container.getContainerData(), createBlocks,
+        setBytes);
+    inspectThenRepairOnCorrectContainer(container.getContainerData());
+  }
+
+  public void inspectThenRepairOnCorrectContainer(
+      KeyValueContainerData containerData) throws Exception {
+    // No output for correct containers.
+    Assert.assertNull(runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.INSPECT));
+
+    Assert.assertNull(runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.REPAIR));
+  }
+
+  /**
+   * Creates a container as specified by the parameters.
+   * Runs the inspector in inspect mode and checks the output.
+   * Runs the inspector in repair mode and checks the output.
+   *
+   * @param createdBlocks Number of blocks to create in the container.
+   * @param setBlocks total block count value set in the database.
+   * @param setBytes total used bytes value set in the database.
+   */
+  public void inspectThenRepairOnIncorrectContainer(
+      KeyValueContainerData containerData, int createdBlocks, int setBlocks,
+      int setBytes) throws Exception {
+    int createdBytes = CHUNK_LEN * CHUNKS_PER_BLOCK * createdBlocks;
+    int createdFiles = 0;
+    switch (getChunkLayout()) {
+    case FILE_PER_BLOCK:
+      createdFiles = createdBlocks;
+      break;
+    case FILE_PER_CHUNK:
+      createdFiles = createdBlocks * CHUNKS_PER_BLOCK;
+      break;
+    default:
+      Assert.fail("Unrecognized chunk layout version.");
+    }
+
+    String containerState = containerData.getState().toString();
+
+    // First inspect the container.
+    JsonObject inspectJson = runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.INSPECT);
+
+    checkJsonReportForIncorrectContainer(inspectJson,
+        containerState, createdBlocks, setBlocks, createdBytes, setBytes,
+        createdFiles, false);
+    // Container should not have been modified in inspect mode.
+    checkDBBlockAndByteCounts(containerData, setBlocks, setBytes);
+
+    // Now repair the container.
+    JsonObject repairJson = runInspectorAndGetReport(containerData,
+        KeyValueContainerMetadataInspector.Mode.REPAIR);
+    checkJsonReportForIncorrectContainer(repairJson,
+        containerState, createdBlocks, setBlocks, createdBytes, setBytes,
+        createdFiles, true);
+    // Metadata keys should have been fixed.
+    checkDBBlockAndByteCounts(containerData, createdBlocks, createdBytes);
+  }
+
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private void checkJsonReportForIncorrectContainer(JsonObject inspectJson,
+      String expectedContainerState, long createdBlocks,
+      long setBlocks, long createdBytes, long setBytes, long createdFiles,
+      boolean shouldRepair) {
+    // Check main container properties.
+    Assert.assertEquals(inspectJson.get("containerID").getAsLong(),
+        CONTAINER_ID);
+    Assert.assertEquals(inspectJson.get("containerState").getAsString(),
+        expectedContainerState);
+
+    // Check DB metadata.
+    JsonObject jsonDbMetadata = inspectJson.getAsJsonObject("dBMetadata");
+    Assert.assertEquals(setBlocks,
+        jsonDbMetadata.get(OzoneConsts.BLOCK_COUNT).getAsLong());
+    Assert.assertEquals(setBytes,
+        jsonDbMetadata.get(OzoneConsts.CONTAINER_BYTES_USED).getAsLong());
+
+    // Check aggregate metadata values.
+    JsonObject jsonAggregates = inspectJson.getAsJsonObject("aggregates");
+    Assert.assertEquals(createdBlocks,
+        jsonAggregates.get("blockCount").getAsLong());
+    Assert.assertEquals(createdBytes,
+        jsonAggregates.get("usedBytes").getAsLong());
+    Assert.assertEquals(0,
+        jsonAggregates.get("pendingDeleteBlocks").getAsLong());
+
+    // Check chunks directory.
+    JsonObject jsonChunksDir = inspectJson.getAsJsonObject("chunksDirectory");
+    Assert.assertTrue(jsonChunksDir.get("present").getAsBoolean());
+    Assert.assertEquals(createdFiles,
+        jsonChunksDir.get("fileCount").getAsLong());
+
+    // Check errors.
+    checkJsonErrorsReport(inspectJson, "dBMetadata.#BLOCKCOUNT",
+        new JsonPrimitive(createdBlocks), new JsonPrimitive(setBlocks),
+        shouldRepair);
+    checkJsonErrorsReport(inspectJson, "dBMetadata.#BYTESUSED",
+        new JsonPrimitive(createdBytes), new JsonPrimitive(setBytes),
+        shouldRepair);
+  }
+
+  /**
+   * Checks the erorr list in the provided JsonReport for an error matching
+   * the template passed in with the parameters.
+   */
+  private void checkJsonErrorsReport(JsonObject jsonReport,
+      String propertyValue, JsonPrimitive correctExpected,
+      JsonPrimitive correctActual, boolean correctRepair) {
+
+    Assert.assertFalse(jsonReport.get("correct").getAsBoolean());
+
+    JsonArray jsonErrors = jsonReport.getAsJsonArray("errors");
+    boolean matchFound = false;
+    for (JsonElement jsonErrorElem: jsonErrors) {
+      JsonObject jsonErrorObject = jsonErrorElem.getAsJsonObject();
+      String thisProperty =
+          jsonErrorObject.get("property").getAsString();
+
+      if (thisProperty.equals(propertyValue)) {
+        matchFound = true;
+
+        JsonPrimitive expectedJsonPrim =
+            jsonErrorObject.get("expected").getAsJsonPrimitive();
+        Assert.assertEquals(correctExpected, expectedJsonPrim);
+
+        JsonPrimitive actualJsonPrim =
+            jsonErrorObject.get("actual").getAsJsonPrimitive();
+        Assert.assertEquals(correctActual, actualJsonPrim);
+
+        boolean repaired =
+            jsonErrorObject.get("repaired").getAsBoolean();
+        Assert.assertEquals(correctRepair, repaired);
+        break;
+      }
+    }
+
+    Assert.assertTrue(matchFound);
+  }
+
+  public void setDBBlockAndByteCounts(KeyValueContainerData containerData,
+      long blockCount, long byteCount) throws Exception {
+    try (ReferenceCountedDB db = BlockUtils.getDB(containerData, getConf())) {
+      Table<String, Long> metadataTable = db.getStore().getMetadataTable();
+      // Don't care about in memory state. Just change the DB values.
+      metadataTable.put(OzoneConsts.BLOCK_COUNT, blockCount);
+      metadataTable.put(OzoneConsts.CONTAINER_BYTES_USED, byteCount);
+    }
+  }
+
+  public void checkDBBlockAndByteCounts(KeyValueContainerData containerData,
+      long expectedBlockCount, long expectedBytesUsed) throws Exception {
+    try (ReferenceCountedDB db = BlockUtils.getDB(containerData, getConf())) {
+      Table<String, Long> metadataTable = db.getStore().getMetadataTable();
+
+      long bytesUsed = metadataTable.get(OzoneConsts.CONTAINER_BYTES_USED);
+      Assert.assertEquals(expectedBytesUsed, bytesUsed);
+
+      long blockCount = metadataTable.get(OzoneConsts.BLOCK_COUNT);
+      Assert.assertEquals(expectedBlockCount, blockCount);
+    }
+  }
+
+  private JsonObject runInspectorAndGetReport(
+      KeyValueContainerData containerData,
+      KeyValueContainerMetadataInspector.Mode mode) throws Exception {
+    System.setProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY,
+        mode.toString());
+    ContainerInspectorUtil.load();
+    JsonObject json = runInspectorAndGetReport(containerData);
+    ContainerInspectorUtil.unload();
+    System.clearProperty(KeyValueContainerMetadataInspector.SYSTEM_PROPERTY);
+
+    return json;
+  }
+
+  private JsonObject runInspectorAndGetReport(
+      KeyValueContainerData containerData) throws Exception {
+    // Use an empty layout so the captured log has no prefix and can be
+    // parsed as json.
+    GenericTestUtils.LogCapturer capturer =
+        GenericTestUtils.LogCapturer.captureLogs(
+            KeyValueContainerMetadataInspector.REPORT_LOG, new 
PatternLayout());
+    KeyValueContainerUtil.parseKVContainerData(containerData, getConf());
+    capturer.stopCapturing();
+    String output = capturer.getOutput();
+    capturer.clearOutput();
+
+    return new Gson().fromJson(output, JsonObject.class);
+  }
+
+  private KeyValueContainer createClosedContainer(int normalBlocks)
+      throws Exception {
+    KeyValueContainer container = createOpenContainer(normalBlocks);
+    container.close();
+    return container;
+  }
+
+  private KeyValueContainer createOpenContainer(int normalBlocks)
+      throws Exception {
+    return super.createContainerWithBlocks(CONTAINER_ID, normalBlocks, 0);
+  }
+
+  private void containsAllStrings(String logOutput, String[] expectedMessages) 
{
+    for (String expectedMessage : expectedMessages) {
+      Assert.assertTrue("Log output did not contain \"" +
+              expectedMessage + "\"", logOutput.contains(expectedMessage));
+    }
+  }
+}
diff --git 
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
 
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index cf8b22c..eb76c5e 100644
--- 
a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++ 
b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -253,21 +253,29 @@ public abstract class GenericTestUtils {
 
     public static LogCapturer captureLogs(Log l) {
       Logger logger = ((Log4JLogger) l).getLogger();
-      return new LogCapturer(logger);
+      return new LogCapturer(logger, getDefaultLayout());
     }
 
     public static LogCapturer captureLogs(org.slf4j.Logger logger) {
-      return new LogCapturer(toLog4j(logger));
+      return new LogCapturer(toLog4j(logger), getDefaultLayout());
     }
 
-    private LogCapturer(Logger logger) {
-      this.logger = logger;
+    public static LogCapturer captureLogs(org.slf4j.Logger logger,
+        Layout layout) {
+      return new LogCapturer(toLog4j(logger), layout);
+    }
+
+    private static Layout getDefaultLayout() {
       Appender defaultAppender = Logger.getRootLogger().getAppender("stdout");
       if (defaultAppender == null) {
         defaultAppender = Logger.getRootLogger().getAppender("console");
       }
-      final Layout layout = (defaultAppender == null) ? new PatternLayout() :
+      return (defaultAppender == null) ? new PatternLayout() :
           defaultAppender.getLayout();
+    }
+
+    private LogCapturer(Logger logger, Layout layout) {
+      this.logger = logger;
       this.appender = new WriterAppender(layout, sw);
       logger.addAppender(this.appender);
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to