This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ec27d56df6 HDDS-8062. Persist reason for container replica being 
marked unhealthy. (#4995)
ec27d56df6 is described below

commit ec27d56df60fa012ee8a57a47c9daeb87aaff089
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Jul 11 01:33:13 2023 -0700

    HDDS-8062. Persist reason for container replica being marked unhealthy. 
(#4995)
---
 .../ozone/container/common/impl/ContainerData.java |  11 +
 .../ozone/container/common/impl/ContainerSet.java  |  10 +-
 .../container/common/impl/HddsDispatcher.java      |  10 +-
 .../container/common/interfaces/Container.java     |  63 ++++-
 .../ozone/container/common/interfaces/Handler.java |  10 +-
 .../CloseContainerCommandHandler.java              |   3 +-
 .../server/ratis/ContainerStateMachine.java        |   3 +-
 .../container/common/utils/ContainerLogger.java    | 164 +++++++++++++
 .../container/keyvalue/KeyValueContainer.java      |   4 +-
 .../container/keyvalue/KeyValueContainerCheck.java | 267 ++++++++++++---------
 .../container/keyvalue/KeyValueContainerData.java  |  11 -
 .../ozone/container/keyvalue/KeyValueHandler.java  |  19 +-
 .../ozoneimpl/BackgroundContainerDataScanner.java  |   9 +-
 .../BackgroundContainerMetadataScanner.java        |  12 +-
 .../container/ozoneimpl/ContainerController.java   |  14 +-
 .../ozoneimpl/OnDemandContainerDataScanner.java    |  11 +-
 .../ozone/container/common/ContainerTestUtils.java |  17 +-
 .../TestCloseContainerCommandHandler.java          |   8 +-
 .../keyvalue/TestKeyValueContainerCheck.java       |   8 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   7 +-
 .../TestBackgroundContainerDataScanner.java        |   6 +-
 .../TestBackgroundContainerMetadataScanner.java    |   6 +-
 .../ozoneimpl/TestContainerScannersAbstract.java   |  18 +-
 .../TestOnDemandContainerDataScanner.java          |  16 +-
 .../dist/dev-support/bin/dist-layout-stitching     |   1 +
 .../src/shell/conf/dn-container-log4j2.properties  |  70 ++++++
 hadoop-ozone/dist/src/shell/ozone/ozone            |   2 +-
 ...tBackgroundContainerDataScannerIntegration.java |   7 +
 ...kgroundContainerMetadataScannerIntegration.java |   6 +
 .../TestContainerScannerIntegrationAbstract.java   |  34 ++-
 ...estOnDemandContainerDataScannerIntegration.java |   6 +
 31 files changed, 634 insertions(+), 199 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 54fca5a61e..323fe1df5c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -103,6 +103,8 @@ public abstract class ContainerData {
 
   private boolean isEmpty;
 
+  private int replicaIndex;
+
   /** Timestamp of last data scan (milliseconds since Unix Epoch).
    * {@code null} if not yet scanned (or timestamp not recorded,
    * eg. in prior versions). */
@@ -164,6 +166,7 @@ public abstract class ContainerData {
     this(source.getContainerType(), source.getContainerID(),
         source.getLayoutVersion(), source.getMaxSize(),
         source.getOriginPipelineId(), source.getOriginNodeId());
+    replicaIndex = source.replicaIndex;
   }
 
   /**
@@ -196,6 +199,14 @@ public abstract class ContainerData {
     return state;
   }
 
+  public int getReplicaIndex() {
+    return replicaIndex;
+  }
+
+  public void setReplicaIndex(int replicaIndex) {
+    this.replicaIndex = replicaIndex;
+  }
+
   /**
    * Set the state of the container.
    * @param state
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 2df5d40ef9..b5dfd07d57 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -194,13 +195,14 @@ public class ContainerSet implements 
Iterable<Container<?>> {
     AtomicBoolean failedVolume = new AtomicBoolean(false);
     AtomicInteger containerCount = new AtomicInteger(0);
     containerMap.values().forEach(c -> {
-      if (c.getContainerData().getVolume().isFailed()) {
-        removeContainer(c.getContainerData().getContainerID());
+      ContainerData data = c.getContainerData();
+      if (data.getVolume().isFailed()) {
+        removeContainer(data.getContainerID());
         LOG.debug("Removing Container {} as the Volume {} " +
-              "has failed", c.getContainerData().getContainerID(),
-            c.getContainerData().getVolume());
+              "has failed", data.getContainerID(), data.getVolume());
         failedVolume.set(true);
         containerCount.incrementAndGet();
+        ContainerLogger.logLost(data, "Volume failure");
       }
     });
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 89c39367bd..11f67a0482 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -66,6 +66,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
@@ -73,6 +74,7 @@ import java.util.Set;
 
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 /**
  * Ozone Container dispatcher takes a call from the netty server and routes it
@@ -359,8 +361,12 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
                 || containerState == State.RECOVERING);
         // mark and persist the container state to be unhealthy
         try {
-          // TODO HDDS-7096: Use on demand scanning here instead.
-          handler.markContainerUnhealthy(container);
+          // TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open
+          //  container instead.
+          handler.markContainerUnhealthy(container,
+              ScanResult.unhealthy(ScanResult.FailureType.WRITE_FAILURE,
+                  new File(container.getContainerData().getContainerPath()),
+                  new StorageContainerException(result)));
           LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID);
         } catch (IOException ioe) {
           // just log the error here in case marking the container fails,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 5652db5d71..de59f9bbea 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -41,6 +41,65 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
  * Interface for Container Operations.
  */
 public interface Container<CONTAINERDATA extends ContainerData> extends RwLock 
{
+  /**
+   * Encapsulates the result of a container scan.
+   */
+  class ScanResult {
+    /**
+     * Represents the reason a container scan failed and a container should
+     * be marked unhealthy.
+     */
+    public enum FailureType {
+      MISSING_CONTAINER_DIR,
+      MISSING_METADATA_DIR,
+      MISSING_CONTAINER_FILE,
+      MISSING_CHUNKS_DIR,
+      MISSING_CHUNK_FILE,
+      CORRUPT_CONTAINER_FILE,
+      CORRUPT_CHUNK,
+      INCONSISTENT_CHUNK_LENGTH,
+      INACCESSIBLE_DB,
+      WRITE_FAILURE
+    }
+
+    private final boolean healthy;
+    private final File unhealthyFile;
+    private final FailureType failureType;
+    private final Throwable exception;
+
+    private ScanResult(boolean healthy, FailureType failureType,
+        File unhealthyFile, Throwable exception) {
+      this.healthy = healthy;
+      this.unhealthyFile = unhealthyFile;
+      this.failureType = failureType;
+      this.exception = exception;
+    }
+
+    public static ScanResult healthy() {
+      return new ScanResult(true, null, null, null);
+    }
+
+    public static ScanResult unhealthy(FailureType type, File failingFile,
+        Throwable exception) {
+      return new ScanResult(false, type, failingFile, exception);
+    }
+
+    public boolean isHealthy() {
+      return healthy;
+    }
+
+    public File getUnhealthyFile() {
+      return unhealthyFile;
+    }
+
+    public FailureType getFailureType() {
+      return failureType;
+    }
+
+    public Throwable getException() {
+      return exception;
+    }
+  }
 
   /**
    * Creates a container.
@@ -174,7 +233,7 @@ public interface Container<CONTAINERDATA extends 
ContainerData> extends RwLock {
    * @return true if the integrity checks pass
    * Scan the container metadata to detect corruption.
    */
-  boolean scanMetaData() throws InterruptedException;
+  ScanResult scanMetaData() throws InterruptedException;
 
   /**
    * Return if the container data should be checksum verified to detect
@@ -195,6 +254,6 @@ public interface Container<CONTAINERDATA extends 
ContainerData> extends RwLock {
    *         false otherwise
    * @throws InterruptedException if the scan is interrupted.
    */
-  boolean scanData(DataTransferThrottler throttler, Canceler canceler)
+  ScanResult scanData(DataTransferThrottler throttler, Canceler canceler)
       throws InterruptedException;
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 62418f2bbe..2ffb9d30d1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -39,6 +39,8 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.ratis.statemachine.StateMachine;
 
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
  * should have an implementation for Handler.
@@ -151,18 +153,22 @@ public abstract class Handler {
    * Marks the container Unhealthy. Moves the container to UNHEALTHY state.
    *
    * @param container container to update
+   * @param reason The reason the container was marked unhealthy
    * @throws IOException in case of exception
    */
-  public abstract void markContainerUnhealthy(Container container)
+  public abstract void markContainerUnhealthy(Container container,
+                                              ScanResult reason)
       throws IOException;
 
   /**
    * Moves the Container to QUASI_CLOSED state.
    *
    * @param container container to be quasi closed
+   * @param reason The reason the container was quasi closed, for logging
+   *               purposes.
    * @throws IOException
    */
-  public abstract void quasiCloseContainer(Container container)
+  public abstract void quasiCloseContainer(Container container, String reason)
       throws IOException;
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index a5ae044d4f..e7a0b2e9f8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -108,7 +108,8 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
           // are moved to CLOSED immediately rather than going to quasi-closed.
           controller.closeContainer(containerId);
         } else {
-          controller.quasiCloseContainer(containerId);
+          controller.quasiCloseContainer(containerId,
+              "Ratis pipeline does not exist");
           LOG.info("Marking Container {} quasi closed", containerId);
         }
         break;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index e8ab1b0af8..0758b245fb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -1072,7 +1072,8 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     for (Long cid : container2BCSIDMap.keySet()) {
       try {
         containerController.markContainerForClose(cid);
-        containerController.quasiCloseContainer(cid);
+        containerController.quasiCloseContainer(cid,
+            "Ratis group removed");
       } catch (IOException e) {
         LOG.debug("Failed to quasi-close container {}", cid);
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
new file mode 100644
index 0000000000..46f8c151e3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+
+/**
+ * Utility class defining methods to write to the datanode container log.
+ *
+ * The container log contains brief messages about container replica level
+ * events like state changes or replication/reconstruction. Messages in this
+ * log are minimal, so it can be used to track the history of every container
+ * on a datanode for a long period of time without rolling off.
+ *
+ * All messages should be logged after their corresponding event succeeds, to
+ * prevent misleading state changes or logs filling with retries on errors.
+ * Errors and retries belong in the main datanode application log.
+ */
+public final class ContainerLogger {
+
+  @VisibleForTesting
+  public static final String LOG_NAME = "ContainerLog";
+  private static final Logger LOG = LoggerFactory.getLogger(LOG_NAME);
+  private static final String FIELD_SEPARATOR = " | ";
+
+  private ContainerLogger() { }
+
+  /**
+   * Logged when an open container is first created.
+   *
+   * @param containerData The container that was created and opened.
+   */
+  public static void logOpen(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  /**
+   * Logged when a container is moved to the closing state.
+   *
+   * @param containerData The container that was marked as closing.
+   */
+  public static void logClosing(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  /**
+   * Logged when a Ratis container is moved to the quasi-closed state.
+   *
+   * @param containerData The container that was quasi-closed.
+   * @param message The reason the container was quasi-closed, if known.
+   */
+  public static void logQuasiClosed(ContainerData containerData,
+      String message) {
+    LOG.warn(getMessage(containerData, message));
+  }
+
+  /**
+   * Logged when a container is moved to the closed state.
+   *
+   * @param containerData The container that was closed.
+   */
+  public static void logClosed(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  /**
+   * Logged when a container is moved to the unhealthy state.
+   *
+   * @param containerData The container that was marked unhealthy.
+   * @param reason The reason the container was marked unhealthy.
+   */
+  public static void logUnhealthy(ContainerData containerData,
+      ScanResult reason) {
+    String message = reason.getFailureType() + " for file " +
+        reason.getUnhealthyFile() +
+        ". Message: " + reason.getException().getMessage();
+    LOG.error(getMessage(containerData, message));
+  }
+
+  /**
+   * Logged when a container is lost from this datanode. Currently this would
+   * only happen on volume failure. Container deletes do not count as lost
+   * containers.
+   *
+   * @param containerData The container that was lost.
+   * @param message The reason the container was lost.
+   */
+  public static void logLost(ContainerData containerData, String message) {
+    LOG.error(getMessage(containerData, message));
+  }
+
+  /**
+   * Logged when a container is deleted because it is empty.
+   *
+   * @param containerData The container that was deleted.
+   */
+  public static void logDeleted(ContainerData containerData, boolean force) {
+    if (force) {
+      LOG.info(getMessage(containerData, "Container force deleted"));
+    } else {
+      LOG.info(getMessage(containerData, "Empty container deleted"));
+    }
+  }
+
+  /**
+   * Logged when a container is copied in to this datanode.
+   *
+   * @param containerData The container that was imported to this datanode.
+   */
+  public static void logImported(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  /**
+   * Logged when a container is copied from this datanode.
+   *
+   * @param containerData The container that was exported from this datanode.
+   */
+  public static void logExported(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  /**
+   * Logged when a container is recovered using EC offline reconstruction.
+   *
+   * @param containerData The container that was recovered on this datanode.
+   */
+  public static void logRecovered(ContainerData containerData) {
+    LOG.info(getMessage(containerData));
+  }
+
+  private static String getMessage(ContainerData containerData,
+                                   String message) {
+    return String.join(FIELD_SEPARATOR, getMessage(containerData), message);
+  }
+
+  private static String getMessage(ContainerData containerData) {
+    return String.join(FIELD_SEPARATOR,
+        "ID=" + containerData.getContainerID(),
+        "Index=" + containerData.getReplicaIndex(),
+        "BCSID=" + containerData.getBlockCommitSequenceId(),
+        "State=" + containerData.getState());
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 85e356388d..7d5ab92e81 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -878,7 +878,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public boolean scanMetaData() throws InterruptedException {
+  public ScanResult scanMetaData() throws InterruptedException {
     long containerId = containerData.getContainerID();
     KeyValueContainerCheck checker =
         new KeyValueContainerCheck(containerData.getMetadataPath(), config,
@@ -900,7 +900,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
   }
 
   @Override
-  public boolean scanData(DataTransferThrottler throttler, Canceler canceler)
+  public ScanResult scanData(DataTransferThrottler throttler, Canceler 
canceler)
       throws InterruptedException {
     if (!shouldScanData()) {
       throw new IllegalStateException("The checksum verification can not be" +
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 0425ed31b7..61a506da2c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -25,13 +25,13 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
@@ -39,6 +39,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -87,24 +88,55 @@ public class KeyValueContainerCheck {
    *
    * @return true : integrity checks pass, false : otherwise.
    */
-  public boolean fastCheck() throws InterruptedException {
+  public ScanResult fastCheck() throws InterruptedException {
     LOG.debug("Running basic checks for container {};", containerID);
-    boolean valid = false;
+
     try {
-      loadContainerData();
-      checkLayout();
-      checkContainerFile();
-      valid = true;
+      // Container directory should exist.
+      File containerDir = new File(metadataPath).getParentFile();
+      if (!containerDir.exists()) {
+        return ScanResult.unhealthy(
+            ScanResult.FailureType.MISSING_CONTAINER_DIR,
+            containerDir, new FileNotFoundException("Container directory " +
+                containerDir + " not found."));
+      }
+
+      // Metadata directory should exist.
+      File metadataDir = new File(metadataPath);
+      if (!metadataDir.exists()) {
+        return 
ScanResult.unhealthy(ScanResult.FailureType.MISSING_METADATA_DIR,
+            metadataDir, new FileNotFoundException("Metadata directory " +
+                metadataDir + " not found."));
+      }
+
+      // Container file should be valid.
+      File containerFile = KeyValueContainer
+          .getContainerFile(metadataPath, containerID);
+      try {
+        loadContainerData(containerFile);
+      } catch (FileNotFoundException ex) {
+        return ScanResult.unhealthy(
+            ScanResult.FailureType.MISSING_CONTAINER_FILE, containerFile, ex);
+      } catch (IOException ex) {
+        return ScanResult.unhealthy(
+            ScanResult.FailureType.CORRUPT_CONTAINER_FILE, containerFile, ex);
+      }
 
-    } catch (IOException e) {
+      // Chunks directory should exist.
+      File chunksDir = new File(onDiskContainerData.getChunksPath());
+      if (!chunksDir.exists()) {
+        return ScanResult.unhealthy(ScanResult.FailureType.MISSING_CHUNKS_DIR,
+            chunksDir, new FileNotFoundException("Chunks directory " +
+                chunksDir + " not found."));
+      }
+
+      return checkContainerFile(containerFile);
+    } finally {
       if (Thread.currentThread().isInterrupted()) {
         throw new InterruptedException("Metadata scan of container " +
             containerID + " interrupted.");
       }
-      handleCorruption(e);
     }
-
-    return valid;
   }
 
   /**
@@ -118,64 +150,22 @@ public class KeyValueContainerCheck {
    *
    * @return true : integrity checks pass, false : otherwise.
    */
-  public boolean fullCheck(DataTransferThrottler throttler,
-                           Canceler canceler) throws InterruptedException {
-    boolean valid;
-
-    try {
-      valid = fastCheck();
-      if (valid) {
-        scanData(throttler, canceler);
-      }
-    } catch (IOException e) {
-      if (Thread.currentThread().isInterrupted()) {
-        throw new InterruptedException("Data scan of container " + containerID 
+
-            " interrupted.");
-      }
-      handleCorruption(e);
-      valid = false;
+  public ScanResult fullCheck(DataTransferThrottler throttler,
+      Canceler canceler) throws InterruptedException {
+    ScanResult result = fastCheck();
+    if (result.isHealthy()) {
+      result = scanData(throttler, canceler);
     }
 
-    return valid;
-  }
-
-  /**
-   * Check the integrity of the directory structure of the container.
-   */
-  private void checkLayout() throws IOException {
-
-    // is metadataPath accessible as a directory?
-    checkDirPath(metadataPath);
-
-    // is chunksPath accessible as a directory?
-    String chunksPath = onDiskContainerData.getChunksPath();
-    checkDirPath(chunksPath);
-  }
-
-  private void checkDirPath(String path) throws IOException {
-
-    File dirPath = new File(path);
-    String errStr;
-
-    try {
-      if (!dirPath.isDirectory()) {
-        errStr = "Not a directory [" + path + "]";
-        throw new IOException(errStr);
-      }
-    } catch (SecurityException se) {
-      throw new IOException("Security exception checking dir ["
-          + path + "]", se);
+    if (!result.isHealthy() && Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException("Data scan of container " + containerID +
+          " interrupted.");
     }
 
-    String[] ls = dirPath.list();
-    if (ls == null) {
-      // null result implies operation failed
-      errStr = "null listing for directory [" + path + "]";
-      throw new IOException(errStr);
-    }
+    return result;
   }
 
-  private void checkContainerFile() throws IOException {
+  private ScanResult checkContainerFile(File containerFile) {
     /*
      * compare the values in the container file loaded from disk,
      * with the values we are expecting
@@ -184,25 +174,33 @@ public class KeyValueContainerCheck {
     Preconditions
         .checkState(onDiskContainerData != null, "Container File not loaded");
 
-    ContainerUtils.verifyChecksum(onDiskContainerData, checkConfig);
+    try {
+      ContainerUtils.verifyChecksum(onDiskContainerData, checkConfig);
+    } catch (IOException ex) {
+      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
+          containerFile, ex);
+    }
 
     if (onDiskContainerData.getContainerType()
         != ContainerProtos.ContainerType.KeyValueContainer) {
       String errStr = "Bad Container type in Containerdata for " + containerID;
-      throw new IOException(errStr);
+      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
+          containerFile, new IOException(errStr));
     }
 
     if (onDiskContainerData.getContainerID() != containerID) {
       String errStr =
           "Bad ContainerID field in Containerdata for " + containerID;
-      throw new IOException(errStr);
+      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
+          containerFile, new IOException(errStr));
     }
 
     dbType = onDiskContainerData.getContainerDBType();
     if (!dbType.equals(CONTAINER_DB_TYPE_ROCKSDB)) {
       String errStr = "Unknown DBType [" + dbType
           + "] in Container File for  [" + containerID + "]";
-      throw new IOException(errStr);
+      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
+          containerFile, new IOException(errStr));
     }
 
     KeyValueContainerData kvData = onDiskContainerData;
@@ -211,12 +209,15 @@ public class KeyValueContainerCheck {
           "Bad metadata path in Containerdata for " + containerID + "Expected 
["
               + metadataPath + "] Got [" + kvData.getMetadataPath()
               + "]";
-      throw new IOException(errStr);
+      return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
+          containerFile, new IOException(errStr));
     }
+
+    return ScanResult.healthy();
   }
 
-  private void scanData(DataTransferThrottler throttler, Canceler canceler)
-      throws IOException {
+  private ScanResult scanData(DataTransferThrottler throttler,
+      Canceler canceler) {
     /*
      * Check the integrity of the DB inside each container.
      * 1. iterate over each key (Block) and locate the chunks for the block
@@ -234,41 +235,52 @@ public class KeyValueContainerCheck {
       String dbFileErrorMsg = "Unable to access DB File [" + dbFile.toString()
           + "] for Container [" + containerID + "] metadata path ["
           + metadataPath + "]";
-      throw new IOException(dbFileErrorMsg);
+      return ScanResult.unhealthy(ScanResult.FailureType.INACCESSIBLE_DB,
+          dbFile, new IOException(dbFileErrorMsg));
     }
 
     onDiskContainerData.setDbFile(dbFile);
 
-    try (DBHandle db = BlockUtils.getDB(onDiskContainerData, checkConfig);
-        BlockIterator<BlockData> kvIter = db.getStore().getBlockIterator(
-            onDiskContainerData.getContainerID(),
-            onDiskContainerData.getUnprefixedKeyFilter())) {
-
-      while (kvIter.hasNext()) {
-        BlockData block = kvIter.nextBlock();
-
-        // If holding read lock for the entire duration, including wait() calls
-        // in DataTransferThrottler, would effectively make other threads
-        // throttled.
-        // Here try optimistically and retry with the container lock to
-        // make sure reading the latest record. If the record is just removed,
-        // the block should be skipped to scan.
-        try {
-          scanBlock(block, throttler, canceler);
-        } catch (OzoneChecksumException ex) {
-          throw ex;
-        } catch (IOException ex) {
-          if (getBlockDataFromDBWithLock(db, block) == null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Scanned outdated blockData {} in container {}.",
-                  block, containerID);
+    try {
+      try (DBHandle db = BlockUtils.getDB(onDiskContainerData, checkConfig);
+          BlockIterator<BlockData> kvIter = db.getStore().getBlockIterator(
+              onDiskContainerData.getContainerID(),
+              onDiskContainerData.getUnprefixedKeyFilter())) {
+
+        while (kvIter.hasNext()) {
+          BlockData block = kvIter.nextBlock();
+
+          // If holding read lock for the entire duration, including wait()
+          // calls in DataTransferThrottler, would effectively make other
+          // threads throttled.
+          // Here try optimistically and retry with the container lock to
+          // make sure reading the latest record. If the record is just 
removed,
+          // the block should be skipped to scan.
+          ScanResult result = scanBlock(block, throttler, canceler);
+          if (!result.isHealthy()) {
+            if (result.getFailureType() ==
+                ScanResult.FailureType.MISSING_CHUNK_FILE) {
+              if (getBlockDataFromDBWithLock(db, block) != null) {
+                // Block was not deleted, the failure is legitimate.
+                return result;
+              } else if (LOG.isDebugEnabled()) {
+                // Block may have been deleted during the scan.
+                LOG.debug("Scanned outdated blockData {} in container {}.",
+                    block, containerID);
+              }
+            } else {
+              // All other failures should be treated as errors.
+              return result;
             }
-          } else {
-            throw ex;
           }
         }
       }
+    } catch (IOException ex) {
+      return ScanResult.unhealthy(ScanResult.FailureType.INACCESSIBLE_DB,
+          dbFile, ex);
     }
+
+    return ScanResult.healthy();
   }
 
   /**
@@ -308,13 +320,20 @@ public class KeyValueContainerCheck {
     }
   }
 
-  private void scanBlock(BlockData block, DataTransferThrottler throttler,
-      Canceler canceler) throws IOException {
+  private ScanResult scanBlock(BlockData block, DataTransferThrottler 
throttler,
+      Canceler canceler) {
     ContainerLayoutVersion layout = onDiskContainerData.getLayoutVersion();
 
     for (ContainerProtos.ChunkInfo chunk : block.getChunks()) {
-      File chunkFile = layout.getChunkFile(onDiskContainerData,
-          block.getBlockID(), ChunkInfo.getFromProtoBuf(chunk));
+      File chunkFile;
+      try {
+        chunkFile = layout.getChunkFile(onDiskContainerData,
+            block.getBlockID(), ChunkInfo.getFromProtoBuf(chunk));
+      } catch (IOException ex) {
+        return ScanResult.unhealthy(
+            ScanResult.FailureType.MISSING_CHUNK_FILE,
+            new File(onDiskContainerData.getChunksPath()), ex);
+      }
 
       if (!chunkFile.exists()) {
         // In EC, client may write empty putBlock in padding block nodes.
@@ -322,21 +341,27 @@ public class KeyValueContainerCheck {
         // the missing chunk file.
         if (block.getChunks().size() > 0 && block
             .getChunks().get(0).getLen() > 0) {
-          throw new IOException(
-              "Missing chunk file " + chunkFile.getAbsolutePath());
+          return 
ScanResult.unhealthy(ScanResult.FailureType.MISSING_CHUNK_FILE,
+              chunkFile, new IOException("Missing chunk file " +
+                  chunkFile.getAbsolutePath()));
         }
       } else if (chunk.getChecksumData().getType()
           != ContainerProtos.ChecksumType.NONE) {
-        verifyChecksum(block, chunk, chunkFile, layout, throttler,
-            canceler);
+        ScanResult result = verifyChecksum(block, chunk, chunkFile, layout,
+            throttler, canceler);
+        if (!result.isHealthy()) {
+          return result;
+        }
       }
     }
+
+    return ScanResult.healthy();
   }
 
-  private static void verifyChecksum(BlockData block,
+  private static ScanResult verifyChecksum(BlockData block,
       ContainerProtos.ChunkInfo chunk, File chunkFile,
       ContainerLayoutVersion layout,
-      DataTransferThrottler throttler, Canceler canceler) throws IOException {
+      DataTransferThrottler throttler, Canceler canceler) {
     ChecksumData checksumData =
         ChecksumData.getFromProtoBuf(chunk.getChecksumData());
     int checksumCount = checksumData.getChecksums().size();
@@ -371,7 +396,7 @@ public class KeyValueContainerCheck {
         ByteString actual = cal.computeChecksum(buffer)
             .getChecksums().get(0);
         if (!expected.equals(actual)) {
-          throw new OzoneChecksumException(String
+          String message = String
               .format("Inconsistent read for chunk=%s" +
                   " checksum item %d" +
                   " expected checksum %s" +
@@ -381,31 +406,33 @@ public class KeyValueContainerCheck {
                   i,
                   Arrays.toString(expected.toByteArray()),
                   Arrays.toString(actual.toByteArray()),
-                  block.getBlockID()));
+                  block.getBlockID());
+          return ScanResult.unhealthy(
+              ScanResult.FailureType.CORRUPT_CHUNK, chunkFile,
+              new IOException(message));
         }
-
       }
       if (bytesRead != chunk.getLen()) {
-        throw new OzoneChecksumException(String
+        String message = String
             .format("Inconsistent read for chunk=%s expected length=%d"
                     + " actual length=%d for block %s",
                 chunk.getChunkName(),
-                chunk.getLen(), bytesRead, block.getBlockID()));
+                chunk.getLen(), bytesRead, block.getBlockID());
+        return ScanResult.unhealthy(
+            ScanResult.FailureType.INCONSISTENT_CHUNK_LENGTH, chunkFile,
+            new IOException(message));
       }
+    } catch (IOException ex) {
+      return ScanResult.unhealthy(
+          ScanResult.FailureType.MISSING_CHUNK_FILE, chunkFile, ex);
     }
-  }
 
-  private void loadContainerData() throws IOException {
-    File containerFile = KeyValueContainer
-        .getContainerFile(metadataPath, containerID);
+    return ScanResult.healthy();
+  }
 
+  private void loadContainerData(File containerFile) throws IOException {
     onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(containerFile);
     onDiskContainerData.setVolume(volume);
   }
-
-  private void handleCorruption(IOException e) {
-    LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
-            containerID, e);
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index fd5b5923d5..f76ac96332 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -90,8 +90,6 @@ public class KeyValueContainerData extends ContainerData {
 
   private long blockCommitSequenceId;
 
-  private int replicaIndex;
-
   static {
     // Initialize YAML fields
     KV_YAML_FIELDS = Lists.newArrayList();
@@ -123,7 +121,6 @@ public class KeyValueContainerData extends ContainerData {
     this.numPendingDeletionBlocks = new AtomicLong(0);
     this.deleteTransactionId = 0;
     this.schemaVersion = source.getSchemaVersion();
-    this.replicaIndex = source.getReplicaIndex();
   }
 
   /**
@@ -326,14 +323,6 @@ public class KeyValueContainerData extends ContainerData {
     metadataTable.put(getPendingDeleteBlockCountKey(), 0L);
   }
 
-  public int getReplicaIndex() {
-    return replicaIndex;
-  }
-
-  public void setReplicaIndex(int replicaIndex) {
-    this.replicaIndex = replicaIndex;
-  }
-
   // NOTE: Below are some helper functions to format keys according
   // to container schemas, we should use them instead of using
   // raw const variables defined.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 279db24c4b..5db14e5d87 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -75,6 +75,7 @@ import 
org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@@ -116,6 +117,7 @@ import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerDataProto.State.RECOVERING;
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -381,12 +383,14 @@ public class KeyValueHandler extends Handler {
     }
 
     if (created) {
+      ContainerLogger.logOpen(newContainerData);
       try {
         sendICR(newContainer);
       } catch (StorageContainerException ex) {
         return ContainerUtils.logAndReturnError(LOG, ex, request);
       }
     }
+
     return getSuccessResponse(request);
   }
 
@@ -1033,6 +1037,7 @@ public class KeyValueHandler extends Handler {
     HddsVolume targetVolume = originalContainerData.getVolume();
     populateContainerPathFields(container, targetVolume);
     container.importContainerData(rawContainerStream, packer);
+    ContainerLogger.logImported(containerData);
     sendICR(container);
     return container;
 
@@ -1045,6 +1050,7 @@ public class KeyValueHandler extends Handler {
       throws IOException {
     final KeyValueContainer kvc = (KeyValueContainer) container;
     kvc.exportContainerData(outputStream, packer);
+    ContainerLogger.logExported(container.getContainerData());
   }
 
   @Override
@@ -1059,8 +1065,10 @@ public class KeyValueHandler extends Handler {
         if (state == RECOVERING) {
           containerSet.removeRecoveringContainer(
               container.getContainerData().getContainerID());
+          ContainerLogger.logRecovered(container.getContainerData());
         }
         container.markContainerForClose();
+        ContainerLogger.logClosing(container.getContainerData());
         sendICR(container);
       }
     } finally {
@@ -1069,7 +1077,7 @@ public class KeyValueHandler extends Handler {
   }
 
   @Override
-  public void markContainerUnhealthy(Container container)
+  public void markContainerUnhealthy(Container container, ScanResult reason)
       throws StorageContainerException {
     container.writeLock();
     try {
@@ -1095,6 +1103,10 @@ public class KeyValueHandler extends Handler {
         LOG.warn("Unexpected error while marking container {} unhealthy",
             containerID, ex);
       } finally {
+        // Even if the container file is corrupted/missing and the unhealthy
+        // update fails, the unhealthy state is kept in memory and sent to
+        // SCM. Write a corresponding entry to the container log as well.
+        ContainerLogger.logUnhealthy(container.getContainerData(), reason);
         sendICR(container);
       }
     } finally {
@@ -1103,7 +1115,7 @@ public class KeyValueHandler extends Handler {
   }
 
   @Override
-  public void quasiCloseContainer(Container container)
+  public void quasiCloseContainer(Container container, String reason)
       throws IOException {
     container.writeLock();
     try {
@@ -1122,6 +1134,7 @@ public class KeyValueHandler extends Handler {
                 .getContainerID() + " while in " + state + " state.", error);
       }
       container.quasiClose();
+      ContainerLogger.logQuasiClosed(container.getContainerData(), reason);
       sendICR(container);
     } finally {
       container.writeUnlock();
@@ -1154,6 +1167,7 @@ public class KeyValueHandler extends Handler {
                 .getContainerID() + " while in " + state + " state.", error);
       }
       container.close();
+      ContainerLogger.logClosed(container.getContainerData());
       sendICR(container);
     } finally {
       container.writeUnlock();
@@ -1377,6 +1391,7 @@ public class KeyValueHandler extends Handler {
     // Avoid holding write locks for disk operations
     container.delete();
     container.getContainerData().setState(State.DELETED);
+    ContainerLogger.logDeleted(container.getContainerData(), force);
     sendICR(container);
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
index 39bb930386..c3cb27deb8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java
@@ -32,6 +32,8 @@ import java.time.Instant;
 import java.util.Iterator;
 import java.util.Optional;
 
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+
 /**
  * Data scanner that full checks a volume. Each volume gets a separate thread.
  */
@@ -84,9 +86,12 @@ public class BackgroundContainerDataScanner extends
     ContainerData containerData = c.getContainerData();
     long containerId = containerData.getContainerID();
     logScanStart(containerData);
-    if (!c.scanData(throttler, canceler)) {
+    ScanResult result = c.scanData(throttler, canceler);
+    if (!result.isHealthy()) {
+      LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
+          containerId, result.getException());
       metrics.incNumUnHealthyContainers();
-      controller.markContainerUnhealthy(containerId);
+      controller.markContainerUnhealthy(containerId, result);
     }
 
     metrics.incNumContainersScanned();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
index 75f6488301..018870e21d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java
@@ -75,12 +75,16 @@ public class BackgroundContainerMetadataScanner extends
       return;
     }
 
-    // Do not update the scan timestamp since this was just a metadata scan,
-    // not a full scan.
-    if (!container.scanMetaData()) {
+    Container.ScanResult result = container.scanMetaData();
+    if (!result.isHealthy()) {
+      LOG.error("Corruption detected in container [{}]. Marking it UNHEALTHY.",
+          containerID, result.getException());
       metrics.incNumUnHealthyContainers();
-      controller.markContainerUnhealthy(containerID);
+      controller.markContainerUnhealthy(containerID, result);
     }
+
+    // Do not update the scan timestamp after the scan since this was just a
+    // metadata scan, not a full data scan.
     metrics.incNumContainersScanned();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 576069fb16..feb5805387 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -41,6 +41,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
+
 /**
  * Control plane for container management in datanode.
  */
@@ -107,13 +109,14 @@ public class ContainerController {
    * Marks the container as UNHEALTHY.
    *
    * @param containerId Id of the container to update
+   * @param reason The reason the container was marked unhealthy
    * @throws IOException in case of exception
    */
-  public void markContainerUnhealthy(final long containerId)
+  public void markContainerUnhealthy(final long containerId, ScanResult reason)
           throws IOException {
     Container container = containerSet.getContainer(containerId);
     if (container != null) {
-      getHandler(container).markContainerUnhealthy(container);
+      getHandler(container).markContainerUnhealthy(container, reason);
     } else {
       LOG.warn("Container {} not found, may be deleted, skip mark UNHEALTHY",
           containerId);
@@ -135,11 +138,14 @@ public class ContainerController {
    * Quasi closes a container given its id.
    *
    * @param containerId Id of the container to quasi close
+   * @param reason The reason the container was quasi closed, for logging
+   *               purposes.
    * @throws IOException in case of exception
    */
-  public void quasiCloseContainer(final long containerId) throws IOException {
+  public void quasiCloseContainer(final long containerId, String reason)
+      throws IOException {
     final Container container = containerSet.getContainer(containerId);
-    getHandler(container).quasiCloseContainer(container);
+    getHandler(container).quasiCloseContainer(container, reason);
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
index 4f0aa042a2..ec3256bdc5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,9 +130,15 @@ public final class OnDemandContainerDataScanner {
     try {
       ContainerData containerData = container.getContainerData();
       logScanStart(containerData);
-      if (!container.scanData(instance.throttler, instance.canceler)) {
+
+      ScanResult result =
+          container.scanData(instance.throttler, instance.canceler);
+      if (!result.isHealthy()) {
+        LOG.error("Corruption detected in container [{}]." +
+                "Marking it UNHEALTHY.", containerId, result.getException());
         instance.metrics.incNumUnHealthyContainers();
-        instance.containerController.markContainerUnhealthy(containerId);
+        instance.containerController.markContainerUnhealthy(containerId,
+            result);
       }
 
       instance.metrics.incNumContainersScanned();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index a8fc6e4250..530a59c35f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
@@ -52,6 +53,7 @@ import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Random;
@@ -172,7 +174,7 @@ public final class ContainerTestUtils {
 
   public static void setupMockContainer(
       Container<ContainerData> c, boolean shouldScanData,
-      boolean scanMetaDataSuccess, boolean scanDataSuccess,
+      ScanResult metadataScanResult, ScanResult dataScanResult,
       AtomicLong containerIdSeq, HddsVolume vol) {
     ContainerData data = mock(ContainerData.class);
     when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
@@ -183,14 +185,23 @@ public final class ContainerTestUtils {
 
     try {
       when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
-          .thenReturn(scanDataSuccess);
-      Mockito.lenient().when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
+          .thenReturn(dataScanResult);
+      Mockito.lenient().when(c.scanMetaData()).thenReturn(metadataScanResult);
     } catch (InterruptedException ex) {
       // Mockito.when invocations will not throw this exception. It is just
       // required for compilation.
     }
   }
 
+  /**
+   * Construct an unhealthy scan result to use for testing purposes.
+   */
+  public static ScanResult getUnhealthyScanResult() {
+    return ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CHUNK,
+        new File(""),
+        new IOException("Fake corruption failure for testing"));
+  }
+
   public static KeyValueContainer addContainerToDeletedDir(
       HddsVolume volume, String clusterId,
       OzoneConfiguration conf, String schemaVersion)
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index f8a9c360c0..4d3d435ed2 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -126,7 +126,7 @@ public class TestCloseContainerCommandHandler {
     verify(writeChannel)
         .submitRequest(any(), eq(pipelineID.getProtobuf()));
     verify(containerHandler, never())
-        .quasiCloseContainer(container);
+        .quasiCloseContainer(eq(container), any());
   }
 
   @Test
@@ -141,7 +141,7 @@ public class TestCloseContainerCommandHandler {
     // Container in CLOSING state is moved to UNHEALTHY if pipeline does not
     // exist. Container should not exist in CLOSING state without a pipeline.
     verify(containerHandler)
-        .quasiCloseContainer(container);
+        .quasiCloseContainer(eq(container), any());
   }
 
   @Test
@@ -192,7 +192,7 @@ public class TestCloseContainerCommandHandler {
     verify(writeChannel)
         .submitRequest(any(), any());
     verify(containerHandler, never())
-        .quasiCloseContainer(container);
+        .quasiCloseContainer(eq(container), any());
     verify(containerHandler, never())
         .closeContainer(container);
   }
@@ -210,7 +210,7 @@ public class TestCloseContainerCommandHandler {
     verify(containerHandler, never())
         .markContainerForClose(container);
     verify(containerHandler, never())
-        .quasiCloseContainer(container);
+        .quasiCloseContainer(eq(container), any());
     verify(containerHandler, never())
         .closeContainer(container);
     verify(writeChannel, never())
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
index e8384caa0d..c3a814b7c8 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerCheck.java
@@ -74,14 +74,14 @@ public class TestKeyValueContainerCheck
             containerID, containerData.getVolume(), container);
 
     // first run checks on a Open Container
-    boolean valid = kvCheck.fastCheck();
+    boolean valid = kvCheck.fastCheck().isHealthy();
     assertTrue(valid);
 
     container.close();
 
     // next run checks on a Closed Container
     valid = kvCheck.fullCheck(new DataTransferThrottler(
-        c.getBandwidthPerVolume()), null);
+        c.getBandwidthPerVolume()), null).isHealthy();
     assertTrue(valid);
   }
 
@@ -131,12 +131,12 @@ public class TestKeyValueContainerCheck
     }
 
     // metadata check should pass.
-    boolean valid = kvCheck.fastCheck();
+    boolean valid = kvCheck.fastCheck().isHealthy();
     assertTrue(valid);
 
     // checksum validation should fail.
     valid = kvCheck.fullCheck(new DataTransferThrottler(
-            sc.getBandwidthPerVolume()), null);
+            sc.getBandwidthPerVolume()), null).isHealthy();
     assertFalse(valid);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 3c9c41c777..8fd8d08f24 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -160,13 +161,15 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
     // When volume is failed, the call to mark the container unhealthy should
     // be ignored.
     Mockito.when(mockVolume.isFailed()).thenReturn(true);
-    handler.markContainerUnhealthy(container);
+    handler.markContainerUnhealthy(container,
+        ContainerTestUtils.getUnhealthyScanResult());
     Mockito.verify(mockIcrSender, Mockito.never()).send(Mockito.any());
 
     // When volume is healthy, ICR should be sent when container is marked
     // unhealthy.
     Mockito.when(mockVolume.isFailed()).thenReturn(false);
-    handler.markContainerUnhealthy(container);
+    handler.markContainerUnhealthy(container,
+        ContainerTestUtils.getUnhealthyScanResult());
     Mockito.verify(mockIcrSender, Mockito.atMostOnce()).send(Mockito.any());
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
index 134ec050e2..4b37df0e14 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -37,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -149,9 +151,9 @@ public class TestBackgroundContainerDataScanner extends
   @Override
   public void testUnhealthyContainerNotRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(true);
+    when(unhealthy.scanMetaData()).thenReturn(ScanResult.healthy());
     when(unhealthy.scanData(any(DataTransferThrottler.class),
-        any(Canceler.class))).thenReturn(false);
+        any(Canceler.class))).thenReturn(getUnhealthyScanResult());
 
     setContainers(unhealthy, healthy);
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
index ff542d667a..81f6ba3d40 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
@@ -37,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -130,10 +132,10 @@ public class TestBackgroundContainerMetadataScanner 
extends
   @Override
   public void testUnhealthyContainerNotRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(false);
+    when(unhealthy.scanMetaData()).thenReturn(getUnhealthyScanResult());
     when(unhealthy.scanData(
         any(DataTransferThrottler.class), any(Canceler.class)))
-        .thenReturn(true);
+        .thenReturn(ScanResult.healthy());
 
     setContainers(unhealthy, healthy);
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
index 1298eb8cad..b78d201791 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
@@ -41,8 +42,11 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
 import static 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.CONTAINER_SCAN_MIN_GAP_DEFAULT;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -142,7 +146,7 @@ public abstract class TestContainerScannersAbstract {
       Container<?> container, VerificationMode invocationTimes)
       throws Exception {
     Mockito.verify(controller, invocationTimes).markContainerUnhealthy(
-        container.getContainerData().getContainerID());
+        eq(container.getContainerData().getContainerID()), any());
   }
 
   /**
@@ -183,20 +187,24 @@ public abstract class TestContainerScannersAbstract {
   private ContainerController mockContainerController() {
     // healthy container
     ContainerTestUtils.setupMockContainer(healthy,
-        true, true, true, CONTAINER_SEQ_ID, vol);
+        true, ScanResult.healthy(), ScanResult.healthy(),
+        CONTAINER_SEQ_ID, vol);
 
     // Open container (only metadata can be scanned)
     ContainerTestUtils.setupMockContainer(openContainer,
-        false, true, false, CONTAINER_SEQ_ID, vol);
+        false, ScanResult.healthy(), ScanResult.healthy(),
+        CONTAINER_SEQ_ID, vol);
 
     // unhealthy container (corrupt data)
     ContainerTestUtils.setupMockContainer(corruptData,
-        true, true, false, CONTAINER_SEQ_ID, vol);
+        true, ScanResult.healthy(), getUnhealthyScanResult(),
+        CONTAINER_SEQ_ID, vol);
 
     // unhealthy container (corrupt metadata). To simulate container still
     // being open while metadata is corrupted, shouldScanData will return 
false.
     ContainerTestUtils.setupMockContainer(openCorruptMetadata,
-        false, false, false, CONTAINER_SEQ_ID, vol);
+        false, getUnhealthyScanResult(), ScanResult.healthy(),
+        CONTAINER_SEQ_ID, vol);
 
     containers.addAll(Arrays.asList(healthy, corruptData, 
openCorruptMetadata));
     ContainerController mock = mock(ContainerController.class);
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
index 50fb1355e2..802cd81309 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Assertions;
@@ -41,6 +42,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyScanResult;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -125,12 +127,12 @@ public class TestOnDemandContainerDataScanner extends
     OnDemandContainerDataScanner.init(conf, controller);
     //Given a container that has not finished scanning
     CountDownLatch latch = new CountDownLatch(1);
-    Mockito.lenient().when(corruptData.scanData(
+    Mockito.when(corruptData.scanData(
             OnDemandContainerDataScanner.getThrottler(),
             OnDemandContainerDataScanner.getCanceler()))
-        .thenAnswer((Answer<Boolean>) invocation -> {
+        .thenAnswer((Answer<ScanResult>) invocation -> {
           latch.await();
-          return false;
+          return getUnhealthyScanResult();
         });
     Optional<Future<?>> onGoingScan = OnDemandContainerDataScanner
         .scanContainer(corruptData);
@@ -143,8 +145,8 @@ public class TestOnDemandContainerDataScanner extends
     Assertions.assertFalse(secondScan.isPresent());
     latch.countDown();
     onGoingScan.get().get();
-    Mockito.verify(controller, atLeastOnce()).
-        
markContainerUnhealthy(corruptData.getContainerData().getContainerID());
+    Mockito.verify(controller, atLeastOnce()).markContainerUnhealthy(
+        eq(corruptData.getContainerData().getContainerID()), any());
   }
 
   @Test
@@ -245,10 +247,10 @@ public class TestOnDemandContainerDataScanner extends
   @Override
   public void testUnhealthyContainerNotRescanned() throws Exception {
     Container<?> unhealthy = mockKeyValueContainer();
-    when(unhealthy.scanMetaData()).thenReturn(true);
+    when(unhealthy.scanMetaData()).thenReturn(ScanResult.healthy());
     when(unhealthy.scanData(
         any(DataTransferThrottler.class), any(Canceler.class)))
-        .thenReturn(false);
+        .thenReturn(getUnhealthyScanResult());
 
     // First iteration should find the unhealthy container.
     scanContainer(unhealthy);
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching 
b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 2ad8c7419a..7553ef42e5 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -83,6 +83,7 @@ run mkdir -p ./tests
 run cp -r "${ROOT}/hadoop-hdds/common/src/main/conf/" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/om-audit-log4j2.properties" 
"etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/dn-audit-log4j2.properties" 
"etc/hadoop"
+run cp 
"${ROOT}/hadoop-ozone/dist/src/shell/conf/dn-container-log4j2.properties" 
"etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/scm-audit-log4j2.properties" 
"etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/s3g-audit-log4j2.properties" 
"etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/shell/conf/ozone-site.xml" "etc/hadoop"
diff --git a/hadoop-ozone/dist/src/shell/conf/dn-container-log4j2.properties 
b/hadoop-ozone/dist/src/shell/conf/dn-container-log4j2.properties
new file mode 100644
index 0000000000..7a37a3693b
--- /dev/null
+++ b/hadoop-ozone/dist/src/shell/conf/dn-container-log4j2.properties
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership.  The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# <p>
+# http://www.apache.org/licenses/LICENSE-2.0
+# <p>
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+name=PropertiesConfig
+
+# Checks for config change periodically and reloads
+monitorInterval=30
+
+# Log Levels are organized from most specific to least:
+# OFF (most specific, no logging)
+# FATAL (most specific, little data)
+# ERROR
+# WARN
+# INFO
+# DEBUG
+# TRACE (least specific, a lot of data)
+# ALL (least specific, all data)
+
+# Uncomment following section to enable logging to console appender also
+#appenders=console, rollingContainer
+#appender.console.type=Console
+#appender.console.name=STDOUT
+#appender.console.layout.type=PatternLayout
+#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %msg | %throwable{3} 
%n
+
+# Comment this line when using both console and rolling appenders
+appenders=rollingContainer
+
+# Rolling File Appender with size thresholds only.
+# Rolling is triggered when the log file size threshold is breached.
+# The rolled over file is compressed by default
+appender.rollingContainer.type=RollingFile
+appender.rollingContainer.name=RollingContainer
+appender.rollingContainer.fileName 
=${sys:hadoop.log.dir}/dn-container-${hostName}.log
+appender.rollingContainer.filePattern=${sys:hadoop.log.dir}/dn-container-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
+appender.rollingContainer.layout.type=PatternLayout
+appender.rollingContainer.layout.pattern=%d{DEFAULT} | %-5level | %msg | 
%throwable{3} %n
+appender.rollingContainer.policies.type=Policies
+appender.rollingContainer.policies.size.type=SizeBasedTriggeringPolicy
+appender.rollingContainer.policies.size.size=256MB
+appender.rollingContainer.strategy.type=DefaultRolloverStrategy
+appender.rollingContainer.strategy.max=50
+appender.rollingContainer.strategy.delete.type=Delete
+appender.rollingContainer.strategy.delete.basePath=${sys:hadoop.log.dir}
+appender.rollingContainer.strategy.delete.maxDepth=1
+appender.rollingContainer.strategy.delete.ifFileName.type=IfFileName
+appender.rollingContainer.strategy.delete.ifFileName.glob=dn-container-*.log.gz
+
+loggers=container
+logger.container.name=ContainerLog
+logger.container.level=INFO
+logger.container.appenderRefs=rollingContainer
+logger.container.appenderRef.file.ref=RollingContainer
+
+rootLogger.level=INFO
+#rootLogger.appenderRefs=stdout
+#rootLogger.appenderRef.stdout.ref=STDOUT
diff --git a/hadoop-ozone/dist/src/shell/ozone/ozone 
b/hadoop-ozone/dist/src/shell/ozone/ozone
index c2b9364b0f..13ca5a2323 100755
--- a/hadoop-ozone/dist/src/shell/ozone/ozone
+++ b/hadoop-ozone/dist/src/shell/ozone/ozone
@@ -120,7 +120,7 @@ function ozonecmd_case
       OZONE_SUBCMD_SUPPORTDAEMONIZATION="true"
       ozone_deprecate_envvar HDDS_DN_OPTS OZONE_DATANODE_OPTS
       OZONE_DATANODE_OPTS="${RATIS_OPTS} ${OZONE_DATANODE_OPTS}"
-      
OZONE_DATANODE_OPTS="-Dlog4j.configurationFile=${OZONE_CONF_DIR}/dn-audit-log4j2.properties
 ${OZONE_DATANODE_OPTS}"
+      
OZONE_DATANODE_OPTS="-Dlog4j.configurationFile=${OZONE_CONF_DIR}/dn-audit-log4j2.properties,${OZONE_CONF_DIR}/dn-container-log4j2.properties
 ${OZONE_DATANODE_OPTS}"
       
OZONE_DATANODE_OPTS="-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
 ${OZONE_DATANODE_OPTS} ${OZONE_MODULE_ACCESS_ARGS}"
       OZONE_CLASSNAME=org.apache.hadoop.ozone.HddsDatanodeService
       OZONE_RUN_ARTIFACT_NAME="ozone-datanode"
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
index 41c44eebdc..d8eb73fb0d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java
@@ -21,14 +21,17 @@ package org.apache.hadoop.ozone.dn.scanner;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.BackgroundContainerDataScanner;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +45,7 @@ public class TestBackgroundContainerDataScannerIntegration
     extends TestContainerScannerIntegrationAbstract {
 
   private final ContainerCorruptions corruption;
+  private final LogCapturer logCapturer;
 
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> supportedCorruptionTypes() {
@@ -69,6 +73,8 @@ public class TestBackgroundContainerDataScannerIntegration
   public TestBackgroundContainerDataScannerIntegration(
       ContainerCorruptions corruption) {
     this.corruption = corruption;
+    logCapturer = LogCapturer.captureLogs(
+            LoggerFactory.getLogger(ContainerLogger.LOG_NAME));
   }
 
   /**
@@ -91,5 +97,6 @@ public class TestBackgroundContainerDataScannerIntegration
 
     // Wait for SCM to get a report of the unhealthy replica.
     waitForScmToSeeUnhealthyReplica(containerID);
+    corruption.assertLogged(logCapturer);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
index 5dadae316f..0713aeacf1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.dn.scanner;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.BackgroundContainerMetadataScanner;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
@@ -30,6 +31,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -45,6 +47,7 @@ public class TestBackgroundContainerMetadataScannerIntegration
     extends TestContainerScannerIntegrationAbstract {
 
   private final ContainerCorruptions corruption;
+  private final GenericTestUtils.LogCapturer logCapturer;
 
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> supportedCorruptionTypes() {
@@ -81,6 +84,8 @@ public class TestBackgroundContainerMetadataScannerIntegration
   public TestBackgroundContainerMetadataScannerIntegration(
       ContainerCorruptions corruption) {
     this.corruption = corruption;
+    logCapturer = GenericTestUtils.LogCapturer.captureLogs(
+        LoggerFactory.getLogger(ContainerLogger.LOG_NAME));
   }
 
   /**
@@ -116,5 +121,6 @@ public class 
TestBackgroundContainerMetadataScannerIntegration
     // Once the unhealthy replica is reported, the open container's lifecycle
     // state in SCM should move to closed.
     waitForScmToCloseContainer(openContainerID);
+    corruption.assertLogged(logCapturer);
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
index bf6389234b..0d229ef361 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -71,6 +72,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 /**
  * This class tests the data scanner functionality.
@@ -230,7 +232,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       Assert.assertFalse(chunksDir.exists());
-    }),
+    }, ScanResult.FailureType.MISSING_CHUNKS_DIR),
 
     MISSING_METADATA_DIR(container -> {
       File metadataDir =
@@ -243,13 +245,13 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       Assert.assertFalse(metadataDir.exists());
-    }),
+    }, ScanResult.FailureType.MISSING_METADATA_DIR),
 
     MISSING_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       Assert.assertTrue(containerFile.delete());
       Assert.assertFalse(containerFile.exists());
-    }),
+    }, ScanResult.FailureType.MISSING_CONTAINER_FILE),
 
     MISSING_CONTAINER_DIR(container -> {
       File containerDir =
@@ -261,7 +263,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
         throw new UncheckedIOException(ex);
       }
       Assert.assertFalse(containerDir.exists());
-    }),
+    }, ScanResult.FailureType.MISSING_CONTAINER_DIR),
 
     MISSING_BLOCK(container -> {
       File chunksDir = new File(
@@ -275,17 +277,17 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           throw new UncheckedIOException(ex);
         }
       }
-    }),
+    }, ScanResult.FailureType.MISSING_CHUNK_FILE),
 
     CORRUPT_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       corruptFile(containerFile);
-    }),
+    }, ScanResult.FailureType.CORRUPT_CONTAINER_FILE),
 
     TRUNCATED_CONTAINER_FILE(container -> {
       File containerFile = container.getContainerFile();
       truncateFile(containerFile);
-    }),
+    }, ScanResult.FailureType.CORRUPT_CONTAINER_FILE),
 
     CORRUPT_BLOCK(container -> {
       File chunksDir = new 
File(container.getContainerData().getContainerPath(),
@@ -295,7 +297,7 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           .findFirst();
       Assert.assertTrue(blockFile.isPresent());
       corruptFile(blockFile.get());
-    }),
+    }, ScanResult.FailureType.CORRUPT_CHUNK),
 
     TRUNCATED_BLOCK(container -> {
       File chunksDir = new 
File(container.getContainerData().getContainerPath(),
@@ -305,19 +307,31 @@ public abstract class 
TestContainerScannerIntegrationAbstract {
           .findFirst();
       Assert.assertTrue(blockFile.isPresent());
       truncateFile(blockFile.get());
-    });
+    }, ScanResult.FailureType.INCONSISTENT_CHUNK_LENGTH);
 
     private final Consumer<Container<?>> corruption;
+    private final ScanResult.FailureType expectedResult;
     private static final Random RANDOM = new Random();
 
-    ContainerCorruptions(Consumer<Container<?>> corruption) {
+    ContainerCorruptions(Consumer<Container<?>> corruption,
+                         ScanResult.FailureType expectedResult) {
       this.corruption = corruption;
+      this.expectedResult = expectedResult;
+
     }
 
     public void applyTo(Container<?> container) {
       corruption.accept(container);
     }
 
+    /**
+     * Check that the correct corruption type was written to the container log.
+     */
+    public void assertLogged(LogCapturer logCapturer) {
+      Assert.assertTrue(logCapturer.getOutput()
+          .contains(expectedResult.toString()));
+    }
+
     /**
      * Get all container corruption types as parameters for junit 4
      * parameterized tests, except the ones specified.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
index 1b8752b9ea..2c60c577f0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.dn.scanner;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
 import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
@@ -29,6 +30,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 
@@ -42,6 +44,7 @@ public class TestOnDemandContainerDataScannerIntegration
     extends TestContainerScannerIntegrationAbstract {
 
   private final ContainerCorruptions corruption;
+  private final GenericTestUtils.LogCapturer logCapturer;
 
   /**
    The on-demand container scanner is triggered by errors on the block read
@@ -86,6 +89,8 @@ public class TestOnDemandContainerDataScannerIntegration
   public TestOnDemandContainerDataScannerIntegration(
       ContainerCorruptions corruption) {
     this.corruption = corruption;
+    logCapturer = GenericTestUtils.LogCapturer.captureLogs(
+        LoggerFactory.getLogger(ContainerLogger.LOG_NAME));
   }
 
   /**
@@ -113,5 +118,6 @@ public class TestOnDemandContainerDataScannerIntegration
 
     // Wait for SCM to get a report of the unhealthy replica.
     waitForScmToSeeUnhealthyReplica(containerID);
+    corruption.assertLogged(logCapturer);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to