This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new 445eaf1f4a HDDS-11290. Container scanner should keep scanning after 
non-fatal errors (#7127)
445eaf1f4a is described below

commit 445eaf1f4a8748d0187629d65eca61ba93bc18f0
Author: Ethan Rose <[email protected]>
AuthorDate: Mon Nov 4 15:42:40 2024 -0500

    HDDS-11290. Container scanner should keep scanning after non-fatal errors 
(#7127)
---
 .../container/common/impl/HddsDispatcher.java      |  12 +-
 .../container/common/interfaces/Container.java     |  85 +---
 .../ozone/container/common/interfaces/Handler.java |   2 -
 .../container/common/interfaces/ScanResult.java    |  33 ++
 .../container/common/utils/ContainerLogger.java    |   7 +-
 .../container/keyvalue/KeyValueContainer.java      |  16 +-
 .../container/keyvalue/KeyValueContainerCheck.java | 491 +++++++++++----------
 .../ozone/container/keyvalue/KeyValueHandler.java  |   3 +-
 .../AbstractBackgroundContainerScanner.java        |  10 +
 .../ozoneimpl/BackgroundContainerDataScanner.java  |  46 +-
 .../BackgroundContainerMetadataScanner.java        |   9 +-
 .../container/ozoneimpl/ContainerController.java   |   2 +-
 .../container/ozoneimpl/ContainerScanError.java    |  69 +++
 .../ozone/container/ozoneimpl/DataScanResult.java  |  70 +++
 .../container/ozoneimpl/MetadataScanResult.java    |  89 ++++
 .../ozoneimpl/OnDemandContainerDataScanner.java    |  38 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   3 +-
 .../ozone/container/common/ContainerTestUtils.java |  36 +-
 .../keyvalue/TestKeyValueContainerCheck.java       | 133 ++----
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   6 +-
 .../TestBackgroundContainerDataScanner.java        |  19 +-
 .../TestBackgroundContainerMetadataScanner.java    |   8 +-
 .../ozoneimpl/TestContainerScannersAbstract.java   |  22 +-
 .../TestOnDemandContainerDataScanner.java          |  13 +-
 ...tBackgroundContainerDataScannerIntegration.java |   9 +-
 ...kgroundContainerMetadataScannerIntegration.java |   3 +-
 .../TestContainerScannerIntegrationAbstract.java   |  44 +-
 ...estOnDemandContainerDataScannerIntegration.java |   2 +-
 28 files changed, 756 insertions(+), 524 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 60bb1759b1..28aa3d8588 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -60,6 +60,8 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
+import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
 import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
 import org.apache.hadoop.util.Time;
@@ -70,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -79,7 +82,6 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 /**
  * Ozone Container dispatcher takes a call from the netty server and routes it
@@ -396,10 +398,10 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
         try {
           // TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open
           //  container instead.
-          handler.markContainerUnhealthy(container,
-              ScanResult.unhealthy(ScanResult.FailureType.WRITE_FAILURE,
-                  new File(container.getContainerData().getContainerPath()),
-                  new StorageContainerException(result)));
+          ContainerScanError error = new 
ContainerScanError(ContainerScanError.FailureType.WRITE_FAILURE,
+              new File(container.getContainerData().getContainerPath()),
+              new StorageContainerException(result));
+          handler.markContainerUnhealthy(container, 
DataScanResult.fromErrors(Collections.singletonList(error)));
           LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID);
         } catch (IOException ioe) {
           // just log the error here in case marking the container fails,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 12c6835ee7..62e8150e7b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -33,71 +33,13 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
 
 /**
  * Interface for Container Operations.
  */
 public interface Container<CONTAINERDATA extends ContainerData> {
-  /**
-   * Encapsulates the result of a container scan.
-   */
-  class ScanResult {
-    /**
-     * Represents the reason a container scan failed and a container should
-     * be marked unhealthy.
-     */
-    public enum FailureType {
-      MISSING_CONTAINER_DIR,
-      MISSING_METADATA_DIR,
-      MISSING_CONTAINER_FILE,
-      MISSING_CHUNKS_DIR,
-      MISSING_CHUNK_FILE,
-      CORRUPT_CONTAINER_FILE,
-      CORRUPT_CHUNK,
-      INCONSISTENT_CHUNK_LENGTH,
-      INACCESSIBLE_DB,
-      WRITE_FAILURE,
-      DELETED_CONTAINER
-    }
-
-    private final boolean healthy;
-    private final File unhealthyFile;
-    private final FailureType failureType;
-    private final Throwable exception;
-
-    private ScanResult(boolean healthy, FailureType failureType,
-        File unhealthyFile, Throwable exception) {
-      this.healthy = healthy;
-      this.unhealthyFile = unhealthyFile;
-      this.failureType = failureType;
-      this.exception = exception;
-    }
-
-    public static ScanResult healthy() {
-      return new ScanResult(true, null, null, null);
-    }
-
-    public static ScanResult unhealthy(FailureType type, File failingFile,
-        Throwable exception) {
-      return new ScanResult(false, type, failingFile, exception);
-    }
-
-    public boolean isHealthy() {
-      return healthy;
-    }
-
-    public File getUnhealthyFile() {
-      return unhealthyFile;
-    }
-
-    public FailureType getFailureType() {
-      return failureType;
-    }
-
-    public Throwable getException() {
-      return exception;
-    }
-  }
 
   /**
    * Creates a container.
@@ -227,10 +169,10 @@ public interface Container<CONTAINERDATA extends 
ContainerData> {
 
   /**
    * check and report the structural integrity of the container.
-   * @return true if the integrity checks pass
-   * Scan the container metadata to detect corruption.
+   * @return A {@link MetadataScanResult} encapsulating the result of the scan.
+   * @throws InterruptedException if the scanning thread is interrupted before 
it completes.
    */
-  ScanResult scanMetaData() throws InterruptedException;
+  MetadataScanResult scanMetaData() throws InterruptedException;
 
   /**
    * Return if the container data should be checksum verified to detect
@@ -243,15 +185,14 @@ public interface Container<CONTAINERDATA extends 
ContainerData> {
   /**
    * Perform checksum verification for the container data.
    *
-   * @param throttler A reference of {@link DataTransferThrottler} used to
-   *                  perform I/O bandwidth throttling
-   * @param canceler  A reference of {@link Canceler} used to cancel the
-   *                  I/O bandwidth throttling (e.g. for shutdown purpose).
-   * @return true if the checksum verification succeeds
-   *         false otherwise
-   * @throws InterruptedException if the scan is interrupted.
-   */
-  ScanResult scanData(DataTransferThrottler throttler, Canceler canceler)
+   * @param throttler       A reference of {@link DataTransferThrottler} used 
to
+   *                        perform I/O bandwidth throttling
+   * @param canceler        A reference of {@link Canceler} used to cancel the
+   *                        I/O bandwidth throttling (e.g. for shutdown 
purpose).
+   * @return A {@link DataScanResult} encapsulating the result of the scan.
+   * @throws InterruptedException if the scanning thread is interrupted before 
it completes.
+   */
+  DataScanResult scanData(DataTransferThrottler throttler, Canceler canceler)
       throws InterruptedException;
 
   /** Acquire read lock. */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 1579f4af8e..77a4d97878 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -43,8 +43,6 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.ratis.statemachine.StateMachine;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
-
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
  * should have an implementation for Handler.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ScanResult.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ScanResult.java
new file mode 100644
index 0000000000..a7a1f541e2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ScanResult.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
+
+import java.util.List;
+
+/**
+ * Encapsulates the result of a container scan.
+ */
+public interface ScanResult {
+  boolean isHealthy();
+
+  boolean isDeleted();
+
+  List<ContainerScanError> getErrors();
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
index 6a5c5c1175..92940b0194 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
@@ -22,7 +22,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 
 /**
  * Utility class defining methods to write to the datanode container log.
@@ -91,10 +91,7 @@ public final class ContainerLogger {
    */
   public static void logUnhealthy(ContainerData containerData,
       ScanResult reason) {
-    String message = reason.getFailureType() + " for file " +
-        reason.getUnhealthyFile() +
-        ". Message: " + reason.getException().getMessage();
-    LOG.error(getMessage(containerData, message));
+    LOG.error(getMessage(containerData, reason.toString()));
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index c5e2795d74..b4ff62e52d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -62,6 +62,8 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -940,11 +942,9 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public ScanResult scanMetaData() throws InterruptedException {
-    long containerId = containerData.getContainerID();
+  public MetadataScanResult scanMetaData() throws InterruptedException {
     KeyValueContainerCheck checker =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), config,
-            containerId, containerData.getVolume(), this);
+        new KeyValueContainerCheck(config, this);
     return checker.fastCheck();
   }
 
@@ -963,7 +963,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public ScanResult scanData(DataTransferThrottler throttler, Canceler 
canceler)
+  public DataScanResult scanData(DataTransferThrottler throttler, Canceler 
canceler)
       throws InterruptedException {
     if (!shouldScanData()) {
       throw new IllegalStateException("The checksum verification can not be" +
@@ -971,11 +971,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
           + containerData.getState());
     }
 
-    long containerId = containerData.getContainerID();
-    KeyValueContainerCheck checker =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), config,
-            containerId, containerData.getVolume(), this);
-
+    KeyValueContainerCheck checker = new KeyValueContainerCheck(config, this);
     return checker.fullCheck(throttler, canceler);
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 194c8c1309..080e4611dd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -18,23 +18,24 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -46,7 +47,15 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
+import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError.FailureType;
+import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
 import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
@@ -64,25 +73,28 @@ public class KeyValueContainerCheck {
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyValueContainerCheck.class);
 
-  private long containerID;
-  private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
-  private ConfigurationSource checkConfig;
-
-  private String metadataPath;
-  private HddsVolume volume;
-  private KeyValueContainer container;
+  private final long containerID;
+  private final ConfigurationSource checkConfig;
+
+  private final String metadataPath;
+  private final HddsVolume volume;
+  private final KeyValueContainer container;
+  // Container data already loaded in the datanode's memory. Used when the 
container data cannot be loaded from the
+  // disk, for example, because the container was deleted during a scan.
+  private final KeyValueContainerData containerDataFromMemory;
+  // Container data read from the container file on disk. Used to verify the 
integrity of the container.
+  // This is not loaded until a scan begins.
+  private KeyValueContainerData containerDataFromDisk;
   private static final DirectBufferPool BUFFER_POOL = new DirectBufferPool();
 
-  public KeyValueContainerCheck(String metadataPath, ConfigurationSource conf,
-      long containerID, HddsVolume volume, KeyValueContainer container) {
-    Preconditions.checkArgument(metadataPath != null);
-
+  public KeyValueContainerCheck(ConfigurationSource conf, KeyValueContainer 
container) {
     this.checkConfig = conf;
-    this.containerID = containerID;
-    this.onDiskContainerData = null;
-    this.metadataPath = metadataPath;
-    this.volume = volume;
     this.container = container;
+    this.containerDataFromDisk = null;
+    this.containerDataFromMemory = this.container.getContainerData();
+    this.containerID = containerDataFromMemory.getContainerID();
+    this.metadataPath = containerDataFromMemory.getMetadataPath();
+    this.volume = containerDataFromMemory.getVolume();
   }
 
   /**
@@ -92,50 +104,18 @@ public class KeyValueContainerCheck {
    *
    * @return true : integrity checks pass, false : otherwise.
    */
-  public ScanResult fastCheck() throws InterruptedException {
-    LOG.debug("Running basic checks for container {};", containerID);
+  public MetadataScanResult fastCheck() throws InterruptedException {
+    LOG.debug("Running metadata checks for container {}", containerID);
 
     try {
-      // Container directory should exist.
-      File containerDir = new File(metadataPath).getParentFile();
-      if (!containerDir.exists()) {
-        return ScanResult.unhealthy(
-            ScanResult.FailureType.MISSING_CONTAINER_DIR,
-            containerDir, new FileNotFoundException("Container directory " +
-                containerDir + " not found."));
+      List<ContainerScanError> metadataErrors = scanMetadata();
+      if (containerIsDeleted()) {
+        return MetadataScanResult.deleted();
       }
-
-      // Metadata directory should exist.
-      File metadataDir = new File(metadataPath);
-      if (!metadataDir.exists()) {
-        return 
ScanResult.unhealthy(ScanResult.FailureType.MISSING_METADATA_DIR,
-            metadataDir, new FileNotFoundException("Metadata directory " +
-                metadataDir + " not found."));
-      }
-
-      // Container file should be valid.
-      File containerFile = KeyValueContainer
-          .getContainerFile(metadataPath, containerID);
-      try {
-        loadContainerData(containerFile);
-      } catch (FileNotFoundException ex) {
-        return ScanResult.unhealthy(
-            ScanResult.FailureType.MISSING_CONTAINER_FILE, containerFile, ex);
-      } catch (IOException ex) {
-        return ScanResult.unhealthy(
-            ScanResult.FailureType.CORRUPT_CONTAINER_FILE, containerFile, ex);
-      }
-
-      // Chunks directory should exist.
-      File chunksDir = new File(onDiskContainerData.getChunksPath());
-      if (!chunksDir.exists()) {
-        return ScanResult.unhealthy(ScanResult.FailureType.MISSING_CHUNKS_DIR,
-            chunksDir, new FileNotFoundException("Chunks directory " +
-                chunksDir + " not found."));
-      }
-
-      return checkContainerFile(containerFile);
+      return MetadataScanResult.fromErrors(metadataErrors);
     } finally {
+      // IO operations during the scan will throw different types of 
exceptions if the thread is interrupted.
+      // the only consistent indicator of interruption in this case is the 
thread's interrupt flag.
       if (Thread.currentThread().isInterrupted()) {
         throw new InterruptedException("Metadata scan of container " +
             containerID + " interrupted.");
@@ -143,6 +123,52 @@ public class KeyValueContainerCheck {
     }
   }
 
+  private List<ContainerScanError> scanMetadata() {
+    List<ContainerScanError> metadataErrors = new ArrayList<>();
+    // Container directory should exist.
+    // If it does not, we cannot continue the scan.
+    File containerDir = new File(metadataPath).getParentFile();
+    if (!containerDir.exists()) {
+      metadataErrors.add(new 
ContainerScanError(FailureType.MISSING_CONTAINER_DIR,
+          containerDir, new FileNotFoundException("Container directory " + 
containerDir + " not found.")));
+      return metadataErrors;
+    }
+
+    // Metadata directory within the container directory should exist.
+    // If it does not, no further scanning can be done.
+    File metadataDir = new File(metadataPath);
+    if (!metadataDir.exists()) {
+      metadataErrors.add(new 
ContainerScanError(FailureType.MISSING_METADATA_DIR, metadataDir,
+          new FileNotFoundException("Metadata directory " + metadataDir + " 
not found.")));
+      return metadataErrors;
+    }
+
+    // Container file inside the metadata directory should be valid.
+    // If it is not, no further scanning can be done.
+    File containerFile = KeyValueContainer.getContainerFile(metadataPath, 
containerID);
+    try {
+      loadContainerData(containerFile);
+    } catch (FileNotFoundException ex) {
+      metadataErrors.add(new 
ContainerScanError(FailureType.MISSING_CONTAINER_FILE, containerFile, ex));
+      return metadataErrors;
+    } catch (IOException ex) {
+      metadataErrors.add(new 
ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, containerFile, ex));
+      return metadataErrors;
+    }
+    metadataErrors.addAll(checkContainerFile(containerFile));
+
+    // Chunks directory should exist.
+    // The metadata scan can continue even if this fails, since it does not 
look at the data inside the chunks
+    // directory.
+    File chunksDir = new File(containerDataFromDisk.getChunksPath());
+    if (!chunksDir.exists()) {
+      metadataErrors.add(new 
ContainerScanError(FailureType.MISSING_CHUNKS_DIR, chunksDir,
+          new FileNotFoundException("Chunks directory " + chunksDir + " not 
found.")));
+    }
+
+    return metadataErrors;
+  }
+
   /**
    * full checks comprise scanning all metadata inside the container.
    * Including the KV database. These checks are intrusive, consume more
@@ -154,152 +180,127 @@ public class KeyValueContainerCheck {
    *
    * @return true : integrity checks pass, false : otherwise.
    */
-  public ScanResult fullCheck(DataTransferThrottler throttler,
-      Canceler canceler) throws InterruptedException {
-    ScanResult result = fastCheck();
-    if (result.isHealthy()) {
-      result = scanData(throttler, canceler);
+  public DataScanResult fullCheck(DataTransferThrottler throttler, Canceler 
canceler) throws InterruptedException {
+    // If the metadata check fails, we cannot do the data check.
+    // The DataScanResult will have an empty tree with 0 checksums to indicate 
this.
+    MetadataScanResult metadataResult = fastCheck();
+    if (metadataResult.isDeleted()) {
+      return DataScanResult.deleted();
+    } else if (!metadataResult.isHealthy()) {
+      return DataScanResult.unhealthyMetadata(metadataResult);
     }
 
-    if (!result.isHealthy() && Thread.currentThread().isInterrupted()) {
-      throw new InterruptedException("Data scan of container " + containerID +
-          " interrupted.");
+    LOG.debug("Running data checks for container {}", containerID);
+    try {
+      // TODO HDDS-10374 this tree will get updated with the container's 
contents as it is scanned.
+      ContainerMerkleTree dataTree = new ContainerMerkleTree();
+      List<ContainerScanError> dataErrors = scanData(dataTree, throttler, 
canceler);
+      if (containerIsDeleted()) {
+        return DataScanResult.deleted();
+      }
+      return DataScanResult.fromErrors(dataErrors, dataTree);
+    } finally {
+      // IO operations during the scan will throw different types of 
exceptions if the thread is interrupted.
+      // the only consistent indicator of interruption in this case is the 
thread's interrupt flag.
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException("Data scan of container " + containerID 
+
+            " interrupted.");
+      }
+    }
+  }
+
+  private List<ContainerScanError> scanData(ContainerMerkleTree currentTree, 
DataTransferThrottler throttler,
+      Canceler canceler) {
+    Preconditions.checkState(containerDataFromDisk != null,
+        "invoke loadContainerData prior to calling this function");
+
+    List<ContainerScanError> errors = new ArrayList<>();
+
+    // If the DB cannot be loaded, we cannot proceed with the data scan.
+    File dbFile = containerDataFromDisk.getDbFile();
+    if (!dbFile.exists() || !dbFile.canRead()) {
+      String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
+          + "] for Container [" + containerID + "] metadata path ["
+          + metadataPath + "]";
+      errors.add(new ContainerScanError(FailureType.INACCESSIBLE_DB, dbFile, 
new IOException(dbFileErrorMsg)));
+      return errors;
+    }
+
+    try {
+      try (DBHandle db = BlockUtils.getDB(containerDataFromDisk, checkConfig);
+           BlockIterator<BlockData> kvIter = db.getStore().getBlockIterator(
+               containerDataFromDisk.getContainerID(),
+               containerDataFromDisk.getUnprefixedKeyFilter())) {
+        // If the container was deleted during the scan, stop trying to 
process its data.
+        while (kvIter.hasNext() && !containerIsDeleted()) {
+          List<ContainerScanError> blockErrors = scanBlock(db, dbFile, 
kvIter.nextBlock(), throttler, canceler,
+              currentTree);
+          errors.addAll(blockErrors);
+        }
+      }
+    } catch (IOException ex) {
+      errors.add(new ContainerScanError(FailureType.INACCESSIBLE_DB, dbFile, 
ex));
     }
 
-    return result;
+    return errors;
   }
 
-  private ScanResult checkContainerFile(File containerFile) {
+  private List<ContainerScanError> checkContainerFile(File containerFile) {
     /*
      * compare the values in the container file loaded from disk,
      * with the values we are expecting
      */
     String dbType;
     Preconditions
-        .checkState(onDiskContainerData != null, "Container File not loaded");
+        .checkState(containerDataFromDisk != null, "Container File not 
loaded");
+    
+    List<ContainerScanError> errors = new ArrayList<>();
 
+    // If the file checksum does not match, we will not try to read the file.
     try {
-      ContainerUtils.verifyContainerFileChecksum(onDiskContainerData, 
checkConfig);
+      ContainerUtils.verifyContainerFileChecksum(containerDataFromDisk, 
checkConfig);
     } catch (IOException ex) {
-      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
-          containerFile, ex);
+      errors.add(new ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, 
containerFile, ex));
+      return errors;
     }
 
-    if (onDiskContainerData.getContainerType()
+    // All other failures are independent.
+    if (containerDataFromDisk.getContainerType()
         != ContainerProtos.ContainerType.KeyValueContainer) {
       String errStr = "Bad Container type in Containerdata for " + containerID;
-      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
-          containerFile, new IOException(errStr));
+      errors.add(new ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, 
containerFile, new IOException(errStr)));
     }
 
-    if (onDiskContainerData.getContainerID() != containerID) {
+    if (containerDataFromDisk.getContainerID() != containerID) {
       String errStr =
           "Bad ContainerID field in Containerdata for " + containerID;
-      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
-          containerFile, new IOException(errStr));
+      errors.add(new ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, 
containerFile, new IOException(errStr)));
     }
 
-    dbType = onDiskContainerData.getContainerDBType();
+    dbType = containerDataFromDisk.getContainerDBType();
     if (!dbType.equals(CONTAINER_DB_TYPE_ROCKSDB)) {
       String errStr = "Unknown DBType [" + dbType
           + "] in Container File for  [" + containerID + "]";
-      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
-          containerFile, new IOException(errStr));
+      errors.add(new ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, 
containerFile, new IOException(errStr)));
     }
 
-    KeyValueContainerData kvData = onDiskContainerData;
-    if (!metadataPath.equals(kvData.getMetadataPath())) {
+    if (!metadataPath.equals(containerDataFromDisk.getMetadataPath())) {
       String errStr =
           "Bad metadata path in Containerdata for " + containerID + "Expected 
["
-              + metadataPath + "] Got [" + kvData.getMetadataPath()
+              + metadataPath + "] Got [" + 
containerDataFromDisk.getMetadataPath()
               + "]";
-      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
-          containerFile, new IOException(errStr));
+      errors.add(new ContainerScanError(FailureType.CORRUPT_CONTAINER_FILE, 
containerFile, new IOException(errStr)));
     }
 
-    return ScanResult.healthy();
+    return errors;
   }
 
-  private ScanResult scanData(DataTransferThrottler throttler,
-      Canceler canceler) {
-    /*
-     * Check the integrity of the DB inside each container.
-     * 1. iterate over each key (Block) and locate the chunks for the block
-     * 2. garbage detection (TBD): chunks which exist in the filesystem,
-     *    but not in the DB. This function will be implemented in HDDS-1202
-     * 3. chunk checksum verification.
-     */
-    Preconditions.checkState(onDiskContainerData != null,
-        "invoke loadContainerData prior to calling this function");
-
-    File dbFile = KeyValueContainerLocationUtil
-        .getContainerDBFile(onDiskContainerData);
-
-    if (!dbFile.exists() || !dbFile.canRead()) {
-      String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
-          + "] for Container [" + containerID + "] metadata path ["
-          + metadataPath + "]";
-      return ScanResult.unhealthy(ScanResult.FailureType.INACCESSIBLE_DB,
-          dbFile, new IOException(dbFileErrorMsg));
-    }
-
-    onDiskContainerData.setDbFile(dbFile);
-
-    try {
-      try (DBHandle db = BlockUtils.getDB(onDiskContainerData, checkConfig);
-          BlockIterator<BlockData> kvIter = db.getStore().getBlockIterator(
-              onDiskContainerData.getContainerID(),
-              onDiskContainerData.getUnprefixedKeyFilter())) {
-
-        while (kvIter.hasNext()) {
-          BlockData block = kvIter.nextBlock();
-
-          // If holding read lock for the entire duration, including wait()
-          // calls in DataTransferThrottler, would effectively make other
-          // threads throttled.
-          // Here try optimistically and retry with the container lock to
-          // make sure reading the latest record. If the record is just 
removed,
-          // the block should be skipped to scan.
-          ScanResult result = scanBlock(block, throttler, canceler);
-          if (!result.isHealthy()) {
-            if (result.getFailureType() ==
-                ScanResult.FailureType.MISSING_CHUNK_FILE) {
-              if (getBlockDataFromDBWithLock(db, block) != null) {
-                // Block was not deleted, the failure is legitimate.
-                return result;
-              } else {
-                // If schema V3 and container details not in DB or
-                // if containerDBPath is removed
-                if ((onDiskContainerData.hasSchema(OzoneConsts.SCHEMA_V3) &&
-                    db.getStore().getMetadataTable().get(
-                      onDiskContainerData.getBcsIdKey()) == null)  ||
-                    !new File(onDiskContainerData.getDbFile()
-                        .getAbsolutePath()).exists()) {
-                  // Container has been deleted. Skip the rest of the blocks.
-                  return ScanResult.unhealthy(
-                      ScanResult.FailureType.DELETED_CONTAINER,
-                      result.getUnhealthyFile(), result.getException());
-                }
-
-                // Block may have been deleted during the scan.
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Scanned outdated blockData {} in container {}.",
-                      block, containerID);
-                }
-              }
-            } else {
-              // All other failures should be treated as errors.
-              return result;
-            }
-          }
-        }
-      }
-    } catch (IOException ex) {
-      return ScanResult.unhealthy(ScanResult.FailureType.INACCESSIBLE_DB,
-          dbFile, ex);
-    }
-
-    return ScanResult.healthy();
+  /**
+   * Checks if a container has been deleted based on its state in datanode 
memory. This state change is the first
+   * step in deleting a container on a datanode and is done in a thread-safe 
manner. See KeyValueHandler#deleteInternal.
+   */
+  private boolean containerIsDeleted() {
+    return containerDataFromMemory.getState() == State.DELETED;
   }
 
   /**
@@ -315,7 +316,7 @@ public class KeyValueContainerCheck {
   private BlockData getBlockDataFromDB(DBHandle db, BlockData block)
       throws IOException {
     String blockKey =
-        onDiskContainerData.getBlockKey(block.getBlockID().getLocalID());
+        containerDataFromDisk.getBlockKey(block.getBlockID().getLocalID());
     return db.getStore().getBlockDataTable().get(blockKey);
   }
 
@@ -329,62 +330,98 @@ public class KeyValueContainerCheck {
    * @return blockData in DB
    * @throws IOException
    */
-  private BlockData getBlockDataFromDBWithLock(DBHandle db, BlockData block)
+  private boolean blockInDBWithLock(DBHandle db, BlockData block)
       throws IOException {
     container.readLock();
     try {
-      return getBlockDataFromDB(db, block);
+      return getBlockDataFromDB(db, block) != null;
     } finally {
       container.readUnlock();
     }
   }
 
-  private ScanResult scanBlock(BlockData block, DataTransferThrottler 
throttler,
-      Canceler canceler) {
-    ContainerLayoutVersion layout = onDiskContainerData.getLayoutVersion();
+  private List<ContainerScanError> scanBlock(DBHandle db, File dbFile, 
BlockData block,
+      DataTransferThrottler throttler, Canceler canceler, ContainerMerkleTree 
currentTree) {
+    ContainerLayoutVersion layout = containerDataFromDisk.getLayoutVersion();
+
+    List<ContainerScanError> blockErrors = new ArrayList<>();
+
+    // If the chunk or block disappears from the disk during this scan, stop 
checking it.
+    // Future checksum checks will likely fail and the block may have been 
deleted.
+    // At the end we will check the DB with a lock to determine whether the 
file was actually deleted.
+    boolean fileMissing = false;
+    Iterator<ContainerProtos.ChunkInfo> chunkIter = 
block.getChunks().iterator();
+    while (chunkIter.hasNext() && !fileMissing) {
+      ContainerProtos.ChunkInfo chunk = chunkIter.next();
+      // This is populated with a file if we are able to locate the correct 
directory.
+      Optional<File> optionalFile = Optional.empty();
 
-    for (ContainerProtos.ChunkInfo chunk : block.getChunks()) {
-      File chunkFile;
+      // If we cannot locate where to read chunk files from, then we cannot 
proceed with scanning this block.
       try {
-        chunkFile = layout.getChunkFile(onDiskContainerData,
-            block.getBlockID(), chunk.getChunkName());
-      } catch (IOException ex) {
-        return ScanResult.unhealthy(
-            ScanResult.FailureType.MISSING_CHUNK_FILE,
-            new File(onDiskContainerData.getChunksPath()), ex);
+        optionalFile = Optional.of(layout.getChunkFile(containerDataFromDisk,
+            block.getBlockID(), chunk.getChunkName()));
+      } catch (StorageContainerException ex) {
+        // The parent directory that contains chunk files does not exist.
+        if (ex.getResult() == ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR) {
+          blockErrors.add(new 
ContainerScanError(FailureType.MISSING_CHUNKS_DIR,
+              new File(containerDataFromDisk.getChunksPath()), ex));
+        } else {
+          // Unknown exception occurred trying to locate the file.
+          blockErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK,
+              new File(containerDataFromDisk.getChunksPath()), ex));
+        }
       }
 
-      if (!chunkFile.exists()) {
-        // In EC, client may write empty putBlock in padding block nodes.
-        // So, we need to make sure, chunk length > 0, before declaring
-        // the missing chunk file.
-        if (block.getChunks().size() > 0 && block
-            .getChunks().get(0).getLen() > 0) {
-          return 
ScanResult.unhealthy(ScanResult.FailureType.MISSING_CHUNK_FILE,
-              chunkFile, new IOException("Missing chunk file " +
-                  chunkFile.getAbsolutePath()));
+      if (optionalFile.isPresent()) {
+        File chunkFile = optionalFile.get();
+        if (!chunkFile.exists()) {
+          // In EC, client may write empty putBlock in padding block nodes.
+          // So, we need to make sure, chunk length > 0, before declaring
+          // the missing chunk file.
+          if (!block.getChunks().isEmpty() && 
block.getChunks().get(0).getLen() > 0) {
+            ContainerScanError error = new 
ContainerScanError(FailureType.MISSING_CHUNK_FILE,
+                new File(containerDataFromDisk.getChunksPath()), new 
IOException("Missing chunk file " +
+                chunkFile.getAbsolutePath()));
+            blockErrors.add(error);
+          }
+        } else if (chunk.getChecksumData().getType() != 
ContainerProtos.ChecksumType.NONE) {
+          int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum();
+          ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum);
+          // Keep scanning the block even if there are errors with individual 
chunks.
+          blockErrors.addAll(verifyChecksum(block, chunk, chunkFile, layout, 
buffer, currentTree, throttler, canceler));
+          buffer.clear();
+          BUFFER_POOL.returnBuffer(buffer);
         }
-      } else if (chunk.getChecksumData().getType()
-          != ContainerProtos.ChecksumType.NONE) {
-        int bytesPerChecksum = chunk.getChecksumData().getBytesPerChecksum();
-        ByteBuffer buffer = BUFFER_POOL.getBuffer(bytesPerChecksum);
-        ScanResult result = verifyChecksum(block, chunk, chunkFile, layout, 
buffer,
-            throttler, canceler);
-        buffer.clear();
-        BUFFER_POOL.returnBuffer(buffer);
-        if (!result.isHealthy()) {
-          return result;
+      }
+
+      fileMissing = !optionalFile.isPresent() || !optionalFile.get().exists();
+    }
+
+    try {
+      if (fileMissing && !blockInDBWithLock(db, block)) {
+        // The chunk/block file was missing from the disk, but after checking 
the DB with a lock it is not there either.
+        // This means the block was deleted while the scan was running 
(without a lock) and all errors in this block
+        // can be ignored.
+        blockErrors.clear();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Scanned outdated blockData {} in container {}", block, 
containerID);
         }
       }
+    } catch (IOException ex) {
+      // Failed to read the block metadata from the DB.
+      blockErrors.add(new ContainerScanError(FailureType.INACCESSIBLE_DB, 
dbFile, ex));
     }
 
-    return ScanResult.healthy();
+    return blockErrors;
   }
 
-  private static ScanResult verifyChecksum(BlockData block,
-      ContainerProtos.ChunkInfo chunk, File chunkFile,
-      ContainerLayoutVersion layout, ByteBuffer buffer,
-      DataTransferThrottler throttler, Canceler canceler) {
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  private static List<ContainerScanError> verifyChecksum(BlockData block,
+      ContainerProtos.ChunkInfo chunk, File chunkFile, ContainerLayoutVersion 
layout, ByteBuffer buffer,
+      ContainerMerkleTree currentTree, DataTransferThrottler throttler, 
Canceler canceler) {
+
+    List<ContainerScanError> scanErrors = new ArrayList<>();
+
     ChecksumData checksumData =
         ChecksumData.getFromProtoBuf(chunk.getChecksumData());
     int checksumCount = checksumData.getChecksums().size();
@@ -397,7 +434,10 @@ public class KeyValueContainerCheck {
       if (layout == ContainerLayoutVersion.FILE_PER_BLOCK) {
         channel.position(chunk.getOffset());
       }
-      for (int i = 0; i < checksumCount; i++) {
+      // Only report one error per chunk. Reporting corruption at every "bytes 
per checksum" interval will lead to a
+      // large amount of errors when a full chunk is corrupted.
+      boolean chunkHealthy = true;
+      for (int i = 0; i < checksumCount && chunkHealthy; i++) {
         // limit last read for FILE_PER_BLOCK, to avoid reading next chunk
         if (layout == ContainerLayoutVersion.FILE_PER_BLOCK &&
             i == checksumCount - 1 &&
@@ -429,44 +469,33 @@ public class KeyValueContainerCheck {
                   StringUtils.bytes2Hex(expected.asReadOnlyByteBuffer()),
                   StringUtils.bytes2Hex(actual.asReadOnlyByteBuffer()),
                   block.getBlockID());
-          return ScanResult.unhealthy(
-              ScanResult.FailureType.CORRUPT_CHUNK, chunkFile,
-              new IOException(message));
+          scanErrors.add(new ContainerScanError(FailureType.CORRUPT_CHUNK, 
chunkFile,
+              new OzoneChecksumException(message)));
+          chunkHealthy = false;
         }
       }
-      if (bytesRead != chunk.getLen()) {
+      // If all the checksums match, also check that the length stored in the 
metadata matches the number of bytes
+      // seen on the disk.
+      if (chunkHealthy && bytesRead != chunk.getLen()) {
         String message = String
             .format("Inconsistent read for chunk=%s expected length=%d"
                     + " actual length=%d for block %s",
                 chunk.getChunkName(),
                 chunk.getLen(), bytesRead, block.getBlockID());
-        return ScanResult.unhealthy(
-            ScanResult.FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
-            new IOException(message));
+        scanErrors.add(new 
ContainerScanError(FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
+            new IOException(message)));
       }
     } catch (IOException ex) {
-      return ScanResult.unhealthy(
-          ScanResult.FailureType.MISSING_CHUNK_FILE, chunkFile, ex);
+      scanErrors.add(new ContainerScanError(FailureType.MISSING_CHUNK_FILE, 
chunkFile, ex));
     }
 
-    return ScanResult.healthy();
+    return scanErrors;
   }
 
   private void loadContainerData(File containerFile) throws IOException {
-    onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
+    containerDataFromDisk = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(containerFile);
-    onDiskContainerData.setVolume(volume);
+    containerDataFromDisk.setVolume(volume);
+    
containerDataFromDisk.setDbFile(KeyValueContainerLocationUtil.getContainerDBFile(containerDataFromDisk));
   }
-
-  @VisibleForTesting
-  void setContainerData(KeyValueContainerData containerData) {
-    onDiskContainerData = containerData;
-  }
-
-  @VisibleForTesting
-  ScanResult scanContainer(DataTransferThrottler throttler,
-                           Canceler canceler) {
-    return scanData(throttler, canceler);
-  }
-
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index fa10ffe20a..d587748e6f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -134,9 +134,10 @@ import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerDataProto.State.RECOVERING;
+
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import static 
org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST;
 import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.statemachine.StateMachine;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java
index 0ba01a191f..cb80a696bd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +138,15 @@ public abstract class AbstractBackgroundContainerScanner 
extends Thread {
     }
   }
 
+  public static void logUnhealthyScanResult(long containerID, ScanResult 
result, Logger log) {
+    LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY. 
{}", containerID, result);
+    if (log.isDebugEnabled()) {
+      StringBuilder allErrorString = new StringBuilder();
+      result.getErrors().forEach(r -> allErrorString.append(r).append('\n'));
+      log.debug("Complete list of errors detected while scanning container 
{}:\n{}", containerID, allErrorString);
+    }
+  }
+
   /**
    * Shutdown the current container scanning thread.
    * If the thread is already being shutdown, the call will block until the
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
index 84d21a2017..1a4f0bf646 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -32,9 +33,6 @@ import java.time.Instant;
 import java.util.Iterator;
 import java.util.Optional;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult.FailureType.DELETED_CONTAINER;
-
 /**
  * Data scanner that full checks a volume. Each volume gets a separate thread.
  */
@@ -53,10 +51,11 @@ public class BackgroundContainerDataScanner extends
   private static final String NAME_FORMAT = "ContainerDataScanner(%s)";
   private final ContainerDataScannerMetrics metrics;
   private final long minScanGap;
+  private final ContainerChecksumTreeManager checksumManager;
 
   public BackgroundContainerDataScanner(ContainerScannerConfiguration conf,
                                         ContainerController controller,
-                                        HddsVolume volume) {
+                                        HddsVolume volume, 
ContainerChecksumTreeManager checksumManager) {
     super(String.format(NAME_FORMAT, volume), conf.getDataScanInterval());
     this.controller = controller;
     this.volume = volume;
@@ -64,6 +63,7 @@ public class BackgroundContainerDataScanner extends
     canceler = new Canceler();
     this.metrics = ContainerDataScannerMetrics.create(volume.toString());
     this.minScanGap = conf.getContainerScanMinGap();
+    this.checksumManager = checksumManager;
   }
 
   private boolean shouldScan(Container<?> container) {
@@ -87,27 +87,33 @@ public class BackgroundContainerDataScanner extends
     ContainerData containerData = c.getContainerData();
     long containerId = containerData.getContainerID();
     logScanStart(containerData);
-    ScanResult result = c.scanData(throttler, canceler);
-
-    // Metrics for skipped containers should not be updated.
-    if (result.getFailureType() == DELETED_CONTAINER) {
-      LOG.error("Container [{}] has been deleted.",
-          containerId, result.getException());
-      return;
-    }
-    if (!result.isHealthy()) {
-      LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
-          containerId, result.getException());
-      boolean containerMarkedUnhealthy = 
controller.markContainerUnhealthy(containerId, result);
-      if (containerMarkedUnhealthy) {
-        metrics.incNumUnHealthyContainers();
+    DataScanResult result = c.scanData(throttler, canceler);
+
+    if (result.isDeleted()) {
+      LOG.debug("Container [{}] has been deleted during the data scan.", 
containerId);
+    } else {
+      if (!result.isHealthy()) {
+        logUnhealthyScanResult(containerId, result, LOG);
+
+        // Only increment the number of unhealthy containers if the container 
was not already unhealthy.
+        // TODO HDDS-11593 (to be merged in to the feature branch from 
master): Scanner counters will start from zero
+        //  at the beginning of each run, so this will need to be incremented 
for every unhealthy container seen
+        //  regardless of its previous state.
+        if (controller.markContainerUnhealthy(containerId, result)) {
+          metrics.incNumUnHealthyContainers();
+        }
       }
+      checksumManager.writeContainerDataTree(containerData, 
result.getDataTree());
+      metrics.incNumContainersScanned();
     }
 
-    metrics.incNumContainersScanned();
+    // Even if the container was deleted, mark the scan as completed since we 
already logged it as starting.
     Instant now = Instant.now();
     logScanCompleted(containerData, now);
-    controller.updateDataScanTimestamp(containerId, now);
+
+    if (!result.isDeleted()) {
+      controller.updateDataScanTimestamp(containerId, now);
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
index 4f76efd083..fdbb1e62bb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
@@ -75,10 +75,13 @@ public class BackgroundContainerMetadataScanner extends
       return;
     }
 
-    Container.ScanResult result = container.scanMetaData();
+    MetadataScanResult result = container.scanMetaData();
+    if (result.isDeleted()) {
+      LOG.debug("Container [{}] has been deleted during the metadata scan.", 
containerID);
+      return;
+    }
     if (!result.isHealthy()) {
-      LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
-          containerID, result.getException());
+      logUnhealthyScanResult(containerID, result, LOG);
       boolean containerMarkedUnhealthy = 
controller.markContainerUnhealthy(containerID, result);
       if (containerMarkedUnhealthy) {
         metrics.incNumUnHealthyContainers();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index cab055438d..84ddba759f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -43,7 +43,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 
 /**
  * Control plane for container management in datanode.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
new file mode 100644
index 0000000000..8fbd8f6887
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import java.io.File;
+
+/**
+ * This class is used to identify any error that may be seen while scanning a 
container.
+ */
+public class ContainerScanError {
+  /**
+   * Represents the reason a container scan failed and a container should
+   * be marked unhealthy.
+   */
+  public enum FailureType {
+    MISSING_CONTAINER_DIR,
+    MISSING_METADATA_DIR,
+    MISSING_CONTAINER_FILE,
+    MISSING_CHUNKS_DIR,
+    MISSING_CHUNK_FILE,
+    CORRUPT_CONTAINER_FILE,
+    CORRUPT_CHUNK,
+    INCONSISTENT_CHUNK_LENGTH,
+    INACCESSIBLE_DB,
+    WRITE_FAILURE,
+  }
+
+  private final File unhealthyFile;
+  private final FailureType failureType;
+  private final Throwable exception;
+
+  public ContainerScanError(FailureType failure, File unhealthyFile, Exception 
exception) {
+    this.unhealthyFile = unhealthyFile;
+    this.failureType = failure;
+    this.exception = exception;
+  }
+
+  public File getUnhealthyFile() {
+    return unhealthyFile;
+  }
+
+  public FailureType getFailureType() {
+    return failureType;
+  }
+
+  public Throwable getException() {
+    return exception;
+  }
+
+  @Override
+  public String toString() {
+    return failureType + " for file " + unhealthyFile + " with exception: " + 
exception;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/DataScanResult.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/DataScanResult.java
new file mode 100644
index 0000000000..12c0f36be7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/DataScanResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents the result of a container data scan.
+ * A container data scan will do a full metadata check, and check the contents 
of all block data within the container.
+ * The result will contain all the errors seen while scanning the container, 
and a ContainerMerkleTree representing
+ * the data that the scan saw on the disk when it ran.
+ */
+public final class DataScanResult extends MetadataScanResult {
+
+  private final ContainerMerkleTree dataTree;
+  // Only deleted results can be interned. Healthy results will still have 
different trees.
+  private static final DataScanResult DELETED = new 
DataScanResult(Collections.emptyList(),
+      new ContainerMerkleTree(), true);
+
+  private DataScanResult(List<ContainerScanError> errors, ContainerMerkleTree 
dataTree, boolean deleted) {
+    super(errors, deleted);
+    this.dataTree = dataTree;
+  }
+
+  /**
+   * Constructs an unhealthy data scan result which was aborted before 
scanning any data due to a metadata error.
+   * This data scan result will have an empty data tree with a zero checksum 
to indicate that no data was scanned.
+   */
+  public static DataScanResult unhealthyMetadata(MetadataScanResult result) {
+    Preconditions.checkArgument(!result.isHealthy());
+    return new DataScanResult(result.getErrors(), new ContainerMerkleTree(), 
false);
+  }
+
+  /**
+   * Constructs a data scan result representing a container that was deleted 
during the scan.
+   */
+  public static DataScanResult deleted() {
+    return DELETED;
+  }
+
+  /**
+   * Constructs a data scan result whose health will be determined based on 
the presence of errors.
+   */
+  public static DataScanResult fromErrors(List<ContainerScanError> errors, 
ContainerMerkleTree dataTree) {
+    return new DataScanResult(errors, dataTree, false);
+  }
+
+  public ContainerMerkleTree getDataTree() {
+    return dataTree;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/MetadataScanResult.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/MetadataScanResult.java
new file mode 100644
index 0000000000..e394ba54fe
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/MetadataScanResult.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents the result of a container metadata scan.
+ * A metadata scan only checks the existence of container metadata files and 
the checksum of the .container file.
+ * It does not check the data in the container and therefore will not generate 
a ContainerMerkleTree.
+ */
+public class MetadataScanResult implements ScanResult {
+
+  private final List<ContainerScanError> errors;
+  private final boolean deleted;
+  // Results are immutable. Intern the common cases.
+  private static final MetadataScanResult HEALTHY_RESULT = new 
MetadataScanResult(Collections.emptyList(), false);
+  private static final MetadataScanResult DELETED = new 
MetadataScanResult(Collections.emptyList(), true);
+
+  protected MetadataScanResult(List<ContainerScanError> errors, boolean 
deleted) {
+    this.errors = errors;
+    this.deleted = deleted;
+  }
+
+  /**
+   * Constructs a metadata scan result whose health will be determined based 
on the presence of errors.
+   */
+  public static MetadataScanResult fromErrors(List<ContainerScanError> errors) 
{
+    if (errors.isEmpty()) {
+      return HEALTHY_RESULT;
+    } else {
+      return new MetadataScanResult(errors, false);
+    }
+  }
+
+  /**
+   * Constructs a metadata scan result representing a container that was 
deleted during the scan.
+   */
+  public static MetadataScanResult deleted() {
+    return DELETED;
+  }
+
+  @Override
+  public boolean isDeleted() {
+    return deleted;
+  }
+
+  @Override
+  public boolean isHealthy() {
+    return errors.isEmpty();
+  }
+
+  @Override
+  public List<ContainerScanError> getErrors() {
+    return errors;
+  }
+
+  /**
+   * @return A string representation of the first error in this result, or a 
string indicating the result is healthy.
+   */
+  @Override
+  public String toString() {
+    if (errors.isEmpty()) {
+      return "Scan result has 0 errors";
+    } else if (errors.size() == 1) {
+      return "Scan result has 1 error: " + errors.get(0);
+    } else {
+      return "Scan result has " + errors.size() + " errors. The first error 
is: " + errors.get(0);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
index 2cfd7930cb..eb0f3eedb0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult.FailureType.DELETED_CONTAINER;
+import static 
org.apache.hadoop.ozone.container.ozoneimpl.AbstractBackgroundContainerScanner.logUnhealthyScanResult;
 
 /**
  * Class for performing on demand scans of containers.
@@ -133,28 +133,30 @@ public final class OnDemandContainerDataScanner {
       ContainerData containerData = container.getContainerData();
       logScanStart(containerData);
 
-      ScanResult result =
-          container.scanData(instance.throttler, instance.canceler);
+      ScanResult result = container.scanData(instance.throttler, 
instance.canceler);
       // Metrics for skipped containers should not be updated.
-      if (result.getFailureType() == DELETED_CONTAINER) {
-        LOG.error("Container [{}] has been deleted.",
-            containerId, result.getException());
-        return;
-      }
-      if (!result.isHealthy()) {
-        LOG.error("Corruption detected in container [{}]." +
-                "Marking it UNHEALTHY.", containerId, result.getException());
-        boolean containerMarkedUnhealthy = instance.containerController
-            .markContainerUnhealthy(containerId, result);
-        if (containerMarkedUnhealthy) {
-          instance.metrics.incNumUnHealthyContainers();
+      if (result.isDeleted()) {
+        LOG.debug("Container [{}] has been deleted during the data scan.", 
containerId);
+      } else {
+        if (!result.isHealthy()) {
+          logUnhealthyScanResult(containerId, result, LOG);
+          boolean containerMarkedUnhealthy = instance.containerController
+              .markContainerUnhealthy(containerId, result);
+          if (containerMarkedUnhealthy) {
+            instance.metrics.incNumUnHealthyContainers();
+          }
         }
+        // TODO HDDS-10374 will need to update the merkle tree here as well.
+        instance.metrics.incNumContainersScanned();
       }
 
-      instance.metrics.incNumContainersScanned();
+      // Even if the container was deleted, mark the scan as completed since 
we already logged it as starting.
       Instant now = Instant.now();
       logScanCompleted(containerData, now);
-      instance.containerController.updateDataScanTimestamp(containerId, now);
+
+      if (!result.isDeleted()) {
+        instance.containerController.updateDataScanTimestamp(containerId, now);
+      }
     } catch (IOException e) {
       LOG.warn("Unexpected exception while scanning container "
           + containerId, e);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2cd98e4137..8ae838a7e5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -126,6 +126,7 @@ public class OzoneContainer {
   private final ReplicationServer replicationServer;
   private DatanodeDetails datanodeDetails;
   private StateContext context;
+
   private final ContainerChecksumTreeManager checksumTreeManager;
   private ScheduledExecutorService dbCompactionExecutorService;
 
@@ -384,7 +385,7 @@ public class OzoneContainer {
     dataScanners = new ArrayList<>();
     for (StorageVolume v : volumeSet.getVolumesList()) {
       BackgroundContainerDataScanner s =
-          new BackgroundContainerDataScanner(c, controller, (HddsVolume) v);
+          new BackgroundContainerDataScanner(c, controller, (HddsVolume) v, 
checksumTreeManager);
       s.start();
       dataScanners.add(s);
       backgroundScanners.add(s);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 823ac09bd3..d24f442548 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -39,13 +39,13 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
@@ -65,6 +65,9 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
+import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
+import org.apache.hadoop.ozone.container.ozoneimpl.MetadataScanResult;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
@@ -256,7 +259,7 @@ public final class ContainerTestUtils {
 
   public static void setupMockContainer(
       Container<ContainerData> c, boolean shouldScanData,
-      ScanResult metadataScanResult, ScanResult dataScanResult,
+      MetadataScanResult metadataScanResult, DataScanResult dataScanResult,
       AtomicLong containerIdSeq, HddsVolume vol) {
     ContainerData data = mock(ContainerData.class);
     when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
@@ -265,8 +268,7 @@ public final class ContainerTestUtils {
     when(c.getContainerData().getVolume()).thenReturn(vol);
 
     try {
-      when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
-          .thenReturn(dataScanResult);
+      when(c.scanData(any(DataTransferThrottler.class), 
any(Canceler.class))).thenReturn(dataScanResult);
       Mockito.lenient().when(c.scanMetaData()).thenReturn(metadataScanResult);
     } catch (InterruptedException ex) {
       // Mockito.when invocations will not throw this exception. It is just
@@ -274,22 +276,30 @@ public final class ContainerTestUtils {
     }
   }
 
+  public static DataScanResult getHealthyDataScanResult() {
+    return DataScanResult.fromErrors(Collections.emptyList(), new 
ContainerMerkleTree());
+  }
+
   /**
    * Construct an unhealthy scan result to use for testing purposes.
    */
-  public static ScanResult getUnhealthyScanResult() {
-    return ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CHUNK,
-        new File(""),
-        new IOException("Fake corruption failure for testing"));
+  public static DataScanResult getUnhealthyDataScanResult() {
+    ContainerScanError error = new 
ContainerScanError(ContainerScanError.FailureType.CORRUPT_CHUNK,
+        new File(""), new IOException("Fake data corruption failure for 
testing"));
+    return DataScanResult.fromErrors(Collections.singletonList(error), new 
ContainerMerkleTree());
+  }
+
+  public static MetadataScanResult getHealthyMetadataScanResult() {
+    return MetadataScanResult.fromErrors(Collections.emptyList());
   }
 
   /**
-   * Construct an unhealthy scan result with DELETED_CONTAINER failure type.
+   * Construct an unhealthy scan result to use for testing purposes.
    */
-  public static ScanResult getDeletedContainerResult() {
-    return ScanResult.unhealthy(ScanResult.FailureType.DELETED_CONTAINER,
-        new File(""),
-        new IOException("Fake deleted container exception"));
+  public static MetadataScanResult getUnhealthyMetadataScanResult() {
+    ContainerScanError error = new 
ContainerScanError(ContainerScanError.FailureType.CORRUPT_CONTAINER_FILE,
+        new File(""), new IOException("Fake metadata corruption failure for 
testing"));
+    return DataScanResult.fromErrors(Collections.singletonList(error));
   }
 
   public static KeyValueContainer addContainerToDeletedDir(
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index b24a6f04c4..a470cce402 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
@@ -35,16 +35,11 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration
 import java.io.File;
 import java.io.RandomAccessFile;
 
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult.FailureType.DELETED_CONTAINER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 
 /**
  * Basic sanity test for the KeyValueContainerCheck class.
@@ -69,11 +64,7 @@ public class TestKeyValueContainerCheck
     // test Closed Container
     KeyValueContainer container = createContainerWithBlocks(containerID,
         normalBlocks, deletedBlocks, true);
-    KeyValueContainerData containerData = container.getContainerData();
-
-    KeyValueContainerCheck kvCheck =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID, containerData.getVolume(), container);
+    KeyValueContainerCheck kvCheck = new KeyValueContainerCheck(conf, 
container);
 
     // first run checks on a Open Container
     boolean valid = kvCheck.fastCheck().isHealthy();
@@ -108,9 +99,7 @@ public class TestKeyValueContainerCheck
 
     container.close();
 
-    KeyValueContainerCheck kvCheck =
-        new KeyValueContainerCheck(containerData.getMetadataPath(), conf,
-            containerID, containerData.getVolume(), container);
+    KeyValueContainerCheck kvCheck = new KeyValueContainerCheck(conf, 
container);
 
     File dbFile = KeyValueContainerLocationUtil
         .getContainerDBFile(containerData);
@@ -143,85 +132,47 @@ public class TestKeyValueContainerCheck
   }
 
   @ContainerTestVersionInfo.ContainerTest
-  void testKeyValueContainerCheckDeleted(ContainerTestVersionInfo versionInfo)
-      throws Exception {
+  void testKeyValueContainerCheckDeletedContainer(ContainerTestVersionInfo 
versionInfo) throws Exception {
     initTestData(versionInfo);
 
-    long containerID = 103;
-    int deletedBlocks = 3;
-    int normalBlocks = 0;
-    OzoneConfiguration conf = getConf();
-    ContainerScannerConfiguration sc = conf.getObject(
-        ContainerScannerConfiguration.class);
-
-    // Create container with deleting blocks
-    KeyValueContainer container = createContainerWithBlocks(containerID,
-        normalBlocks, deletedBlocks, false);
+    KeyValueContainer container = createContainerWithBlocks(3,
+        3, 3, true);
     container.close();
-
-    KeyValueContainerData containerData = container.getContainerData();
-
-    // Remove chunks directory to trigger a scanBlock exception
-    File chunksDir = new File(containerData.getChunksPath());
-    assertTrue(chunksDir.exists());
-    assertTrue(new File(containerData.getChunksPath()).delete());
-    assertFalse(chunksDir.exists());
-
-    // Create mockContainerData to scan blocks that are
-    // just about to be deleted.
-    // Then fail because blocks and container has been deleted from disk.
-    KeyValueContainerData mockContainerData = 
mock(KeyValueContainerData.class);
-    when(mockContainerData.hasSchema(eq(OzoneConsts.SCHEMA_V3)))
-        .thenReturn(containerData.hasSchema(OzoneConsts.SCHEMA_V3));
-    when(mockContainerData.getVolume()).thenReturn(containerData.getVolume());
-    when(mockContainerData.getMetadataPath())
-        .thenReturn(containerData.getMetadataPath());
-    when(mockContainerData.getContainerID())
-        .thenReturn(containerData.getContainerID());
-
-    File mockdbFile = mock(File.class);
-    when(mockdbFile.getAbsolutePath()).thenReturn("");
-    // For Schema V2 mimic container DB deletion during Container Scan.
-    when(mockContainerData.getDbFile()).thenReturn(containerData.getDbFile(),
-        containerData.getDbFile(), mockdbFile);
-
-    when(mockContainerData.getContainerDBType())
-        .thenReturn(containerData.getContainerDBType());
-    when(mockContainerData.getSchemaVersion())
-        .thenReturn(containerData.getSchemaVersion());
-
-    // Mimic the scenario where scanning starts just before
-    // blocks are marked for deletion.
-    // That is, UnprefixedKeyFilter will return blocks
-    // that will soon be deleted.
-    when(mockContainerData.getUnprefixedKeyFilter())
-        .thenReturn(containerData.getDeletingBlockKeyFilter());
-    when(mockContainerData.getLayoutVersion())
-        .thenReturn(containerData.getLayoutVersion());
-    when(mockContainerData.getChunksPath())
-        .thenReturn(containerData.getChunksPath());
-    when(mockContainerData.getBlockKey(anyLong()))
-        .thenAnswer(invocationOnMock -> {
-          return containerData.getBlockKey(invocationOnMock.getArgument(0));
-        });
-    when(mockContainerData.containerPrefix())
-        .thenReturn(containerData.containerPrefix());
-    when(mockContainerData.getBcsIdKey())
-        .thenReturn(containerData.getBcsIdKey());
-
-    KeyValueContainerCheck kvCheck = new KeyValueContainerCheck(
-        containerData.getMetadataPath(), conf, containerData.getContainerID(),
-        containerData.getVolume(), container);
-
-    kvCheck.setContainerData(mockContainerData);
-
-    DataTransferThrottler throttler = new DataTransferThrottler(
-        sc.getBandwidthPerVolume());
-    Canceler canceler = null;
-
-    Container.ScanResult result = kvCheck.scanContainer(throttler, canceler);
-
+    KeyValueContainerCheck kvCheck = new KeyValueContainerCheck(getConf(), 
container);
+    // The full container should exist and pass a scan.
+    ScanResult result = kvCheck.fullCheck(mock(DataTransferThrottler.class), 
mock(Canceler.class));
+    assertTrue(result.isHealthy());
+    assertFalse(result.isDeleted());
+
+    // When a container is not marked for deletion and it has peices missing, 
the scan should fail.
+    File metadataDir = new File(container.getContainerData().getChunksPath());
+    FileUtils.deleteDirectory(metadataDir);
+    assertFalse(metadataDir.exists());
+    result = kvCheck.fullCheck(mock(DataTransferThrottler.class), 
mock(Canceler.class));
     assertFalse(result.isHealthy());
-    assertEquals(DELETED_CONTAINER, result.getFailureType());
+    assertFalse(result.isDeleted());
+
+    // Once the container is marked for deletion, the scan should pass even if 
some of the internal pieces are missing.
+    // Here the metadata directory has been deleted.
+    container.markContainerForDelete();
+    result = kvCheck.fullCheck(mock(DataTransferThrottler.class), 
mock(Canceler.class));
+    assertTrue(result.isHealthy());
+    assertTrue(result.isDeleted());
+
+    // Now the data directory is deleted.
+    File chunksDir = new File(container.getContainerData().getChunksPath());
+    FileUtils.deleteDirectory(chunksDir);
+    assertFalse(chunksDir.exists());
+    result = kvCheck.fullCheck(mock(DataTransferThrottler.class), 
mock(Canceler.class));
+    assertTrue(result.isHealthy());
+    assertTrue(result.isDeleted());
+
+    // Now the whole container directory is gone.
+    File containerDir = new 
File(container.getContainerData().getContainerPath());
+    FileUtils.deleteDirectory(containerDir);
+    assertFalse(containerDir.exists());
+    result = kvCheck.fullCheck(mock(DataTransferThrottler.class), 
mock(Canceler.class));
+    assertTrue(result.isHealthy());
+    assertTrue(result.isDeleted());
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 61f592ab01..af0c430c86 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -247,16 +247,14 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
     // When volume is failed, the call to mark the container unhealthy should
     // be ignored.
     hddsVolume.setState(StorageVolume.VolumeState.FAILED);
-    handler.markContainerUnhealthy(container,
-        ContainerTestUtils.getUnhealthyScanResult());
+    handler.markContainerUnhealthy(container, 
ContainerTestUtils.getUnhealthyDataScanResult());
     assertFalse(ContainerChecksumTreeManager.checksumFileExist(container));
     verify(mockIcrSender, never()).send(any());
 
     // When volume is healthy, ICR should be sent when container is marked
     // unhealthy.
     hddsVolume.setState(StorageVolume.VolumeState.NORMAL);
-    handler.markContainerUnhealthy(container,
-        ContainerTestUtils.getUnhealthyScanResult());
+    handler.markContainerUnhealthy(container, 
ContainerTestUtils.getUnhealthyDataScanResult());
     assertTrue(ContainerChecksumTreeManager.checksumFileExist(container));
     verify(mockIcrSender, atMostOnce()).send(any());
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
index 06509af9f4..681c5efc1a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
@@ -19,11 +19,12 @@
  */
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,7 +37,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -62,7 +64,8 @@ public class TestBackgroundContainerDataScanner extends
   @BeforeEach
   public void setup() {
     super.setup();
-    scanner = new BackgroundContainerDataScanner(conf, controller, vol);
+    scanner = new BackgroundContainerDataScanner(conf, controller, vol,
+        new ContainerChecksumTreeManager(new OzoneConfiguration()));
   }
 
   @Test
@@ -156,11 +159,13 @@ public class TestBackgroundContainerDataScanner extends
   @Override
   public void testUnhealthyContainerRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(ScanResult.healthy());
-    when(unhealthy.scanData(any(DataTransferThrottler.class),
-        any(Canceler.class))).thenReturn(getUnhealthyScanResult());
+    when(unhealthy.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
+    when(unhealthy.scanData(any(DataTransferThrottler.class), 
any(Canceler.class)))
+        .thenReturn(getUnhealthyDataScanResult());
+    // If a container is not already in an unhealthy state, the controller 
will return true from this method.
     
when(controller.markContainerUnhealthy(eq(unhealthy.getContainerData().getContainerID()),
         any())).thenReturn(true);
+
     setContainers(unhealthy, healthy);
 
     // First iteration should find the unhealthy container.
@@ -177,7 +182,7 @@ public class TestBackgroundContainerDataScanner extends
     // Update the mock to reflect this.
     when(unhealthy.getContainerState()).thenReturn(UNHEALTHY);
     assertTrue(unhealthy.shouldScanData());
-
+    // Since the container is already unhealthy, the real controller would 
return false from this method.
     
when(controller.markContainerUnhealthy(eq(unhealthy.getContainerData().getContainerID()),
         any())).thenReturn(false);
     scanner.runIteration();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
index c4c40de247..f6a41b8b57 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,7 +35,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyDataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyMetadataScanResult;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -132,10 +132,10 @@ public class TestBackgroundContainerMetadataScanner 
extends
   @Override
   public void testUnhealthyContainerRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(getUnhealthyScanResult());
+    
when(unhealthy.scanMetaData()).thenReturn(getUnhealthyMetadataScanResult());
     when(unhealthy.scanData(
         any(DataTransferThrottler.class), any(Canceler.class)))
-        .thenReturn(ScanResult.healthy());
+        .thenReturn(getHealthyDataScanResult());
     
when(controller.markContainerUnhealthy(eq(unhealthy.getContainerData().getContainerID()),
         any())).thenReturn(true);
     setContainers(unhealthy, healthy);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
index 5d7d8c3100..180fac231a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -41,8 +40,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getDeletedContainerResult;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyDataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyMetadataScanResult;
 import static 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.CONTAINER_SCAN_MIN_GAP_DEFAULT;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.any;
@@ -187,30 +188,35 @@ public abstract class TestContainerScannersAbstract {
   }
 
   private ContainerController mockContainerController() {
+    DataScanResult healthyData = getHealthyDataScanResult();
+    DataScanResult unhealthyData = getUnhealthyDataScanResult();
+    MetadataScanResult healthyMetadata = getHealthyMetadataScanResult();
+    MetadataScanResult unhealthyMetadata = getUnhealthyMetadataScanResult();
+
     // healthy container
     ContainerTestUtils.setupMockContainer(healthy,
-        true, ScanResult.healthy(), ScanResult.healthy(),
+        true, healthyMetadata, healthyData,
         CONTAINER_SEQ_ID, vol);
 
     // Open container (only metadata can be scanned)
     ContainerTestUtils.setupMockContainer(openContainer,
-        false, ScanResult.healthy(), ScanResult.healthy(),
+        false, healthyMetadata, healthyData,
         CONTAINER_SEQ_ID, vol);
 
     // unhealthy container (corrupt data)
     ContainerTestUtils.setupMockContainer(corruptData,
-        true, ScanResult.healthy(), getUnhealthyScanResult(),
+        true, healthyMetadata, unhealthyData,
         CONTAINER_SEQ_ID, vol);
 
     // unhealthy container (corrupt metadata). To simulate container still
     // being open while metadata is corrupted, shouldScanData will return 
false.
     ContainerTestUtils.setupMockContainer(openCorruptMetadata,
-        false, getUnhealthyScanResult(), ScanResult.healthy(),
+        false, unhealthyMetadata, unhealthyData,
         CONTAINER_SEQ_ID, vol);
 
     // Mock container that has been deleted during scan.
     ContainerTestUtils.setupMockContainer(deletedContainer,
-        true, ScanResult.healthy(), getDeletedContainerResult(),
+        true, healthyMetadata, DataScanResult.deleted(),
         CONTAINER_SEQ_ID, vol);
 
     containers.addAll(Arrays.asList(healthy, corruptData, openCorruptMetadata,
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
index c08dc935ad..6f4b2efc13 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -40,7 +40,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
-import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -143,7 +144,7 @@ public class TestOnDemandContainerDataScanner extends
             OnDemandContainerDataScanner.getCanceler()))
         .thenAnswer((Answer<ScanResult>) invocation -> {
           latch.await();
-          return getUnhealthyScanResult();
+          return getUnhealthyDataScanResult();
         });
     Optional<Future<?>> onGoingScan = OnDemandContainerDataScanner
         .scanContainer(corruptData);
@@ -172,6 +173,8 @@ public class TestOnDemandContainerDataScanner extends
     resultFutureList.add(
         OnDemandContainerDataScanner.scanContainer(openCorruptMetadata));
     resultFutureList.add(OnDemandContainerDataScanner.scanContainer(healthy));
+    // Deleted containers will not count towards the scan count metric.
+    
resultFutureList.add(OnDemandContainerDataScanner.scanContainer(deletedContainer));
     waitOnScannerToFinish(resultFutureList);
     OnDemandScannerMetrics metrics = OnDemandContainerDataScanner.getMetrics();
     //Containers with shouldScanData = false shouldn't increase
@@ -261,10 +264,10 @@ public class TestOnDemandContainerDataScanner extends
   @Override
   public void testUnhealthyContainerRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(ScanResult.healthy());
+    when(unhealthy.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
     when(unhealthy.scanData(
         any(DataTransferThrottler.class), any(Canceler.class)))
-        .thenReturn(getUnhealthyScanResult());
+        .thenReturn(getUnhealthyDataScanResult());
     
when(controller.markContainerUnhealthy(eq(unhealthy.getContainerData().getContainerID()),
         any())).thenReturn(true);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
index adc1234c2e..b372c434a0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
@@ -89,6 +89,13 @@ class TestBackgroundContainerDataScannerIntegration
 
     // Wait for SCM to get a report of the unhealthy replica.
     waitForScmToSeeUnhealthyReplica(containerID);
-    corruption.assertLogged(logCapturer);
+
+    // If the block is truncated, every chunk in the block will register an 
error.
+    if (corruption == ContainerCorruptions.TRUNCATED_BLOCK) {
+      corruption.assertLogged(containerID, 2, logCapturer);
+    } else {
+      // Other corruption types will only lead to a single error.
+      corruption.assertLogged(containerID, 1, logCapturer);
+    }
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
index 52da1035ca..b6bbc60241 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
@@ -115,6 +115,7 @@ class TestBackgroundContainerMetadataScannerIntegration
     // Once the unhealthy replica is reported, the open container's lifecycle
     // state in SCM should move to closed.
     waitForScmToCloseContainer(openContainerID);
-    corruption.assertLogged(logCapturer);
+    corruption.assertLogged(openContainerID, 1, logCapturer);
+    corruption.assertLogged(closedContainerID, 1, logCapturer);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
index f53e041b54..36e83f4f0a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError.FailureType;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.ozone.test.GenericTestUtils;
@@ -62,15 +63,15 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -243,7 +244,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       assertFalse(chunksDir.exists());
-    }, ScanResult.FailureType.MISSING_CHUNKS_DIR),
+    }, FailureType.MISSING_CHUNKS_DIR),
 
     MISSING_METADATA_DIR(container -> {
       File metadataDir =
@@ -256,13 +257,13 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       assertFalse(metadataDir.exists());
-    }, ScanResult.FailureType.MISSING_METADATA_DIR),
+    }, FailureType.MISSING_METADATA_DIR),
 
     MISSING_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       assertTrue(containerFile.delete());
       assertFalse(containerFile.exists());
-    }, ScanResult.FailureType.MISSING_CONTAINER_FILE),
+    }, FailureType.MISSING_CONTAINER_FILE),
 
     MISSING_CONTAINER_DIR(container -> {
       File containerDir =
@@ -274,7 +275,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       assertFalse(containerDir.exists());
-    }, ScanResult.FailureType.MISSING_CONTAINER_DIR),
+    }, FailureType.MISSING_CONTAINER_DIR),
 
     MISSING_BLOCK(container -> {
       File chunksDir = new File(
@@ -288,17 +289,17 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           throw new UncheckedIOException(ex);
         }
       }
-    }, ScanResult.FailureType.MISSING_CHUNK_FILE),
+    }, FailureType.MISSING_CHUNK_FILE),
 
     CORRUPT_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       corruptFile(containerFile);
-    }, ScanResult.FailureType.CORRUPT_CONTAINER_FILE),
+    }, FailureType.CORRUPT_CONTAINER_FILE),
 
     TRUNCATED_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       truncateFile(containerFile);
-    }, ScanResult.FailureType.CORRUPT_CONTAINER_FILE),
+    }, FailureType.CORRUPT_CONTAINER_FILE),
 
     CORRUPT_BLOCK(container -> {
       File chunksDir = new 
File(container.getContainerData().getContainerPath(),
@@ -308,7 +309,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           .findFirst();
       assertTrue(blockFile.isPresent());
       corruptFile(blockFile.get());
-    }, ScanResult.FailureType.CORRUPT_CHUNK),
+    }, FailureType.CORRUPT_CHUNK),
 
     TRUNCATED_BLOCK(container -> {
       File chunksDir = new 
File(container.getContainerData().getContainerPath(),
@@ -318,13 +319,12 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           .findFirst();
       assertTrue(blockFile.isPresent());
       truncateFile(blockFile.get());
-    }, ScanResult.FailureType.INCONSISTENT_CHUNK_LENGTH);
+    }, FailureType.INCONSISTENT_CHUNK_LENGTH);
 
     private final Consumer<Container<?>> corruption;
-    private final ScanResult.FailureType expectedResult;
+    private final FailureType expectedResult;
 
-    ContainerCorruptions(Consumer<Container<?>> corruption,
-                         ScanResult.FailureType expectedResult) {
+    ContainerCorruptions(Consumer<Container<?>> corruption, FailureType 
expectedResult) {
       this.corruption = corruption;
       this.expectedResult = expectedResult;
 
@@ -335,11 +335,14 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
     }
 
     /**
-     * Check that the correct corruption type was written to the container log.
+     * Check that the correct corruption type was written to the container log 
for the provided container.
      */
-    public void assertLogged(LogCapturer logCapturer) {
-      assertThat(logCapturer.getOutput())
-          .contains(expectedResult.toString());
+    public void assertLogged(long containerID, int numErrors, LogCapturer 
logCapturer) {
+      // Enable multiline regex mode with "(?m)". This allows ^ to check for 
the start of a line in a multiline string.
+      // The log will have captured lines from all previous tests as well 
since we re-use the same cluster.
+      Pattern logLine = Pattern.compile("(?m)^ID=" + containerID + ".*" + " 
Scan result has " + numErrors +
+          " error.*" + expectedResult.toString());
+      assertThat(logCapturer.getOutput()).containsPattern(logLine);
     }
 
     /**
@@ -364,8 +367,9 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         Path path = file.toPath();
         final byte[] original = IOUtils.readFully(Files.newInputStream(path), 
length);
 
-        final byte[] corruptedBytes = new byte[length];
-        ThreadLocalRandom.current().nextBytes(corruptedBytes);
+        // Corrupt the last byte of the last chunk. This should map to a 
single error from the scanner.
+        final byte[] corruptedBytes = Arrays.copyOf(original, length);
+        corruptedBytes[length - 1] = (byte) (original[length - 1] << 1);
 
         Files.write(path, corruptedBytes,
             StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
index 4b47d061e7..37194812c9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
@@ -110,6 +110,6 @@ class TestOnDemandContainerDataScannerIntegration
 
     // Wait for SCM to get a report of the unhealthy replica.
     waitForScmToSeeUnhealthyReplica(containerID);
-    corruption.assertLogged(logCapturer);
+    corruption.assertLogged(containerID, 1, logCapturer);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to