This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 200b330c47 HDDS-9695. Do not show empty containers as missing in Recon 
UI (#5620)
200b330c47 is described below

commit 200b330c47908e21fe1b97efea1299ae198de306
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Tue Dec 5 08:55:42 2023 +0530

    HDDS-9695. Do not show empty containers as missing in Recon UI (#5620)
---
 .../apache/hadoop/ozone/recon/TestReconTasks.java  | 125 ++++++++-
 .../recon/schema/ContainerSchemaDefinition.java    |   1 +
 .../apache/hadoop/ozone/recon/ReconConstants.java  |   3 +
 .../ozone/recon/fsck/ContainerHealthStatus.java    |  28 +-
 .../ozone/recon/fsck/ContainerHealthTask.java      | 177 ++++++++++--
 .../scm/ReconStorageContainerManagerFacade.java    |   6 +-
 .../recon/fsck/TestContainerHealthStatus.java      |  21 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |  54 +++-
 .../TestContainerHealthTaskRecordGenerator.java    | 301 ++++++++++++++++++---
 9 files changed, 644 insertions(+), 72 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
index 5ecf0890ac..965dd2f525 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
@@ -28,11 +28,16 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
 import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers;
@@ -40,6 +45,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.event.Level;
 
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
@@ -71,6 +77,8 @@ public class TestReconTasks {
     cluster =  MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
         .includeRecon(true).build();
     cluster.waitForClusterToBeReady();
+    GenericTestUtils.setLogLevel(SCMDatanodeHeartbeatDispatcher.LOG,
+        Level.DEBUG);
   }
 
   @AfterEach
@@ -120,6 +128,9 @@ public class TestReconTasks {
     ReconStorageContainerManagerFacade reconScm =
         (ReconStorageContainerManagerFacade)
             cluster.getReconServer().getReconStorageContainerManager();
+    ReconContainerMetadataManager reconContainerMetadataManager =
+        cluster.getReconServer().getReconContainerMetadataManager();
+
     StorageContainerManager scm = cluster.getStorageContainerManager();
     PipelineManager reconPipelineManager = reconScm.getPipelineManager();
     PipelineManager scmPipelineManager = scm.getPipelineManager();
@@ -135,6 +146,13 @@ public class TestReconTasks {
         scmContainerManager
             .allocateContainer(RatisReplicationConfig.getInstance(ONE), 
"test");
     long containerID = containerInfo.getContainerID();
+
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      reconContainerMetadataManager
+          .batchStoreContainerKeyCounts(rdbBatchOperation, containerID, 2L);
+      reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation);
+    }
+
     Pipeline pipeline =
         scmPipelineManager.getPipeline(containerInfo.getPipelineID());
     XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
@@ -147,7 +165,7 @@ public class TestReconTasks {
     // Bring down the Datanode that had the container replica.
     cluster.shutdownHddsDatanode(pipeline.getFirstNode());
 
-    LambdaTestUtils.await(120000, 10000, () -> {
+    LambdaTestUtils.await(120000, 6000, () -> {
       List<UnhealthyContainers> allMissingContainers =
           reconContainerManager.getContainerSchemaManager()
               .getUnhealthyContainers(
@@ -166,5 +184,110 @@ public class TestReconTasks {
                   0, 1000);
       return (allMissingContainers.isEmpty());
     });
+    IOUtils.closeQuietly(client);
+  }
+
+  /**
+   * This test verifies the count of MISSING and EMPTY_MISSING containers.
+   * Following steps being followed in a single DN cluster.
+   *    --- Allocate a container in SCM.
+   *    --- Client writes the chunk and put block to only DN successfully.
+   *    --- Shuts down the only DN.
+   *    --- Since container to key mapping doesn't have any key mapped to
+   *        container, missing container will be marked EMPTY_MISSING.
+   *    --- Add a key mapping entry to key container mapping table for the
+   *        container added.
+   *    --- Now container will no longer be marked as EMPTY_MISSING and just
+   *        as MISSING.
+   *    --- Restart the only DN in cluster.
+   *    --- Now container no longer will be marked as MISSING.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testEmptyMissingContainerDownNode() throws Exception {
+    ReconStorageContainerManagerFacade reconScm =
+        (ReconStorageContainerManagerFacade)
+            cluster.getReconServer().getReconStorageContainerManager();
+    ReconContainerMetadataManager reconContainerMetadataManager =
+        cluster.getReconServer().getReconContainerMetadataManager();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    PipelineManager reconPipelineManager = reconScm.getPipelineManager();
+    PipelineManager scmPipelineManager = scm.getPipelineManager();
+
+    // Make sure Recon's pipeline state is initialized.
+    LambdaTestUtils.await(60000, 1000,
+        () -> (reconPipelineManager.getPipelines().size() >= 1));
+
+    ContainerManager scmContainerManager = scm.getContainerManager();
+    ReconContainerManager reconContainerManager =
+        (ReconContainerManager) reconScm.getContainerManager();
+    ContainerInfo containerInfo =
+        scmContainerManager
+            .allocateContainer(RatisReplicationConfig.getInstance(ONE), 
"test");
+    long containerID = containerInfo.getContainerID();
+
+    Pipeline pipeline =
+        scmPipelineManager.getPipeline(containerInfo.getPipelineID());
+    XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
+    runTestOzoneContainerViaDataNode(containerID, client);
+
+    // Make sure Recon got the container report with new container.
+    assertEquals(scmContainerManager.getContainers(),
+        reconContainerManager.getContainers());
+
+    // Bring down the Datanode that had the container replica.
+    cluster.shutdownHddsDatanode(pipeline.getFirstNode());
+
+    LambdaTestUtils.await(25000, 1000, () -> {
+      List<UnhealthyContainers> allEmptyMissingContainers =
+          reconContainerManager.getContainerSchemaManager()
+              .getUnhealthyContainers(
+                  ContainerSchemaDefinition.UnHealthyContainerStates.
+                      EMPTY_MISSING,
+                  0, 1000);
+      return (allEmptyMissingContainers.size() == 1);
+    });
+
+    // Now add a container to key mapping count as 3. This data is used to
+    // identify if container is empty in terms of keys mapped to container.
+    try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) {
+      reconContainerMetadataManager
+          .batchStoreContainerKeyCounts(rdbBatchOperation, containerID, 3L);
+      reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation);
+    }
+
+    // Verify again and now container is not empty missing but just missing.
+    LambdaTestUtils.await(25000, 1000, () -> {
+      List<UnhealthyContainers> allMissingContainers =
+          reconContainerManager.getContainerSchemaManager()
+              .getUnhealthyContainers(
+                  ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
+                  0, 1000);
+      return (allMissingContainers.size() == 1);
+    });
+
+    LambdaTestUtils.await(25000, 1000, () -> {
+      List<UnhealthyContainers> allEmptyMissingContainers =
+          reconContainerManager.getContainerSchemaManager()
+              .getUnhealthyContainers(
+                  ContainerSchemaDefinition.UnHealthyContainerStates.
+                      EMPTY_MISSING,
+                  0, 1000);
+      return (allEmptyMissingContainers.isEmpty());
+    });
+
+    // Now restart the cluster and verify the container is no longer missing.
+    cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
+    LambdaTestUtils.await(25000, 1000, () -> {
+      List<UnhealthyContainers> allMissingContainers =
+          reconContainerManager.getContainerSchemaManager()
+              .getUnhealthyContainers(
+                  ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
+                  0, 1000);
+      return (allMissingContainers.isEmpty());
+    });
+
+    IOUtils.closeQuietly(client);
   }
 }
diff --git 
a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java
 
b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java
index 1a045636bb..43e2d728b7 100644
--- 
a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java
+++ 
b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java
@@ -47,6 +47,7 @@ public class ContainerSchemaDefinition implements 
ReconSchemaDefinition {
    */
   public enum UnHealthyContainerStates {
     MISSING,
+    EMPTY_MISSING,
     UNDER_REPLICATED,
     OVER_REPLICATED,
     MIS_REPLICATED,
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 5e2e1a837f..134092146e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -58,6 +58,9 @@ public final class ReconConstants {
   public static final String RECON_ENTITY_PATH = "path";
   public static final String RECON_ENTITY_TYPE = "entityType";
   public static final String RECON_ACCESS_METADATA_START_DATE = "startDate";
+  public static final String CONTAINER_COUNT = "CONTAINER_COUNT";
+  public static final String TOTAL_KEYS = "TOTAL_KEYS";
+  public static final String TOTAL_USED_BYTES = "TOTAL_USED_BYTES";
 
   // 1125899906842624L = 1PB
   public static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L;
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
index b004abd432..7785e01a37 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthStatus.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -39,11 +42,16 @@ public class ContainerHealthStatus {
   private int replicaDelta;
   private Set<ContainerReplica> healthyReplicas;
   private ContainerPlacementStatus placementStatus;
+  private ReconContainerMetadataManager reconContainerMetadataManager;
   private int numReplicas;
+  private long numKeys;
 
   ContainerHealthStatus(ContainerInfo container,
                         Set<ContainerReplica> healthyReplicas,
-                        PlacementPolicy placementPolicy) {
+                        PlacementPolicy placementPolicy,
+                        ReconContainerMetadataManager
+                            reconContainerMetadataManager) {
+    this.reconContainerMetadataManager = reconContainerMetadataManager;
     this.container = container;
     int repFactor = container.getReplicationConfig().getRequiredNodes();
     this.healthyReplicas = healthyReplicas
@@ -54,6 +62,7 @@ public class ContainerHealthStatus {
     this.replicaDelta = repFactor - this.healthyReplicas.size();
     this.placementStatus = getPlacementStatus(placementPolicy, repFactor);
     this.numReplicas = healthyReplicas.size();
+    this.numKeys = getContainerKeyCount(container.getContainerID());
   }
 
   public long getContainerID() {
@@ -117,6 +126,10 @@ public class ContainerHealthStatus {
     return numReplicas == 0;
   }
 
+  public boolean isEmpty() {
+    return numKeys == 0;
+  }
+
   private ContainerPlacementStatus getPlacementStatus(
       PlacementPolicy policy, int repFactor) {
     List<DatanodeDetails> dns = healthyReplicas.stream()
@@ -124,4 +137,17 @@ public class ContainerHealthStatus {
         .collect(Collectors.toList());
     return policy.validateContainerPlacement(dns, repFactor);
   }
+
+  private long getContainerKeyCount(long containerID) {
+    try {
+      return reconContainerMetadataManager.getKeyCountForContainer(
+          containerID);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public long getNumKeys() {
+    return numKeys;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index c5fc6ab625..bb93923cfd 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.ozone.recon.fsck;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +40,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
 import org.apache.hadoop.ozone.recon.scm.ReconScmTask;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import org.apache.hadoop.util.Time;
@@ -48,6 +52,10 @@ import org.jooq.Cursor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
+import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_KEYS;
+import static org.apache.hadoop.ozone.recon.ReconConstants.TOTAL_USED_BYTES;
+
 
 /**
  * Class that scans the list of containers and keeps track of containers with
@@ -63,6 +71,7 @@ public class ContainerHealthTask extends ReconScmTask {
   private StorageContainerServiceProvider scmClient;
   private ContainerManager containerManager;
   private ContainerHealthSchemaManager containerHealthSchemaManager;
+  private ReconContainerMetadataManager reconContainerMetadataManager;
   private PlacementPolicy placementPolicy;
   private final long interval;
 
@@ -74,10 +83,12 @@ public class ContainerHealthTask extends ReconScmTask {
       ReconTaskStatusDao reconTaskStatusDao,
       ContainerHealthSchemaManager containerHealthSchemaManager,
       PlacementPolicy placementPolicy,
-      ReconTaskConfig reconTaskConfig) {
+      ReconTaskConfig reconTaskConfig,
+      ReconContainerMetadataManager reconContainerMetadataManager) {
     super(reconTaskStatusDao);
     this.scmClient = scmClient;
     this.containerHealthSchemaManager = containerHealthSchemaManager;
+    this.reconContainerMetadataManager = reconContainerMetadataManager;
     this.placementPolicy = placementPolicy;
     this.containerManager = containerManager;
     interval = reconTaskConfig.getMissingContainerTaskInterval().toMillis();
@@ -100,10 +111,23 @@ public class ContainerHealthTask extends ReconScmTask {
 
   public void triggerContainerHealthCheck() {
     lock.writeLock().lock();
+    // Map contains all UNHEALTHY STATES as keys and value is another map
+    // with 3 keys (CONTAINER_COUNT, TOTAL_KEYS, TOTAL_USED_BYTES) and value
+    // is count for each of these 3 stats.
+    // E.g. <MISSING, <CONTAINER_COUNT, 1>>, <MISSING, <TOTAL_KEYS, 10>>,
+    // <MISSING, <TOTAL_USED_BYTES, 2048>>,
+    // <EMPTY_MISSING, <CONTAINER_COUNT, 10>>, <EMPTY_MISSING, <TOTAL_KEYS, 
2>>,
+    // <EMPTY_MISSING, <TOTAL_USED_BYTES, 2048>>
+    Map<UnHealthyContainerStates, Map<String, Long>>
+        unhealthyContainerStateStatsMap;
     try {
+      unhealthyContainerStateStatsMap = new HashMap<>(Collections.emptyMap());
+      initializeUnhealthyContainerStateStatsMap(
+          unhealthyContainerStateStatsMap);
       long start = Time.monotonicNow();
       long currentTime = System.currentTimeMillis();
-      long existingCount = processExistingDBRecords(currentTime);
+      long existingCount = processExistingDBRecords(currentTime,
+          unhealthyContainerStateStatsMap);
       LOG.info("Container Health task thread took {} milliseconds to" +
               " process {} existing database records.",
           Time.monotonicNow() - start, existingCount);
@@ -111,31 +135,77 @@ public class ContainerHealthTask extends ReconScmTask {
       final List<ContainerInfo> containers = containerManager.getContainers();
       containers.stream()
           .filter(c -> !processedContainers.contains(c))
-          .forEach(c -> processContainer(c, currentTime));
+          .forEach(c -> processContainer(c, currentTime,
+              unhealthyContainerStateStatsMap));
       recordSingleRunCompletion();
       LOG.info("Container Health task thread took {} milliseconds for" +
               " processing {} containers.", Time.monotonicNow() - start,
           containers.size());
+      logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
       processedContainers.clear();
     } finally {
       lock.writeLock().unlock();
     }
   }
 
+  private void logUnhealthyContainerStats(
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap) {
+    // If any EMPTY_MISSING containers, then it is possible that such
+    // containers got stuck in the closing state which never got
+    // any replicas created on the datanodes. In this case, we log it as
+    // EMPTY, and insert as EMPTY_MISSING in UNHEALTHY_CONTAINERS table.
+    unhealthyContainerStateStatsMap.entrySet().forEach(stateEntry -> {
+      UnHealthyContainerStates unhealthyContainerState = stateEntry.getKey();
+      Map<String, Long> containerStateStatsMap = stateEntry.getValue();
+      StringBuilder logMsgBuilder =
+          new StringBuilder(unhealthyContainerState.toString());
+      logMsgBuilder.append(" **Container State Stats:** \n\t");
+      containerStateStatsMap.entrySet().forEach(statsEntry -> {
+        logMsgBuilder.append(statsEntry.getKey());
+        logMsgBuilder.append(" -> ");
+        logMsgBuilder.append(statsEntry.getValue());
+        logMsgBuilder.append(" , ");
+      });
+      LOG.info(logMsgBuilder.toString());
+    });
+  }
+
+  private void initializeUnhealthyContainerStateStatsMap(
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap) {
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.MISSING, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.EMPTY_MISSING, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.UNDER_REPLICATED, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.OVER_REPLICATED, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>());
+  }
+
   private ContainerHealthStatus setCurrentContainer(long recordId)
       throws ContainerNotFoundException {
     ContainerInfo container =
         containerManager.getContainer(ContainerID.valueOf(recordId));
     Set<ContainerReplica> replicas =
         containerManager.getContainerReplicas(container.containerID());
-    return new ContainerHealthStatus(container, replicas, placementPolicy);
+    return new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
   }
 
-  private void completeProcessingContainer(ContainerHealthStatus container,
-      Set<String> existingRecords, long currentTime) {
+  private void completeProcessingContainer(
+      ContainerHealthStatus container,
+      Set<String> existingRecords,
+      long currentTime,
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateCountMap) {
     containerHealthSchemaManager.insertUnhealthyContainerRecords(
         ContainerHealthRecords.generateUnhealthyRecords(
-            container, existingRecords, currentTime));
+            container, existingRecords, currentTime,
+            unhealthyContainerStateCountMap));
     processedContainers.add(container.getContainer());
   }
 
@@ -151,9 +221,13 @@ public class ContainerHealthTask extends ReconScmTask {
    * additional records need to be added to the database.
    *
    * @param currentTime Timestamp to place on all records generated by this run
+   * @param unhealthyContainerStateCountMap
    * @return Count of records processed
    */
-  private long processExistingDBRecords(long currentTime) {
+  private long processExistingDBRecords(long currentTime,
+                                        Map<UnHealthyContainerStates,
+                                            Map<String, Long>>
+                                            unhealthyContainerStateCountMap) {
     long recordCount = 0;
     try (Cursor<UnhealthyContainersRecord> cursor =
              containerHealthSchemaManager.getAllUnhealthyRecordsCursor()) {
@@ -168,7 +242,8 @@ public class ContainerHealthTask extends ReconScmTask {
           }
           if (currentContainer.getContainerID() != rec.getContainerId()) {
             completeProcessingContainer(
-                currentContainer, existingRecords, currentTime);
+                currentContainer, existingRecords, currentTime,
+                unhealthyContainerStateCountMap);
             existingRecords.clear();
             currentContainer = setCurrentContainer(rec.getContainerId());
           }
@@ -195,18 +270,22 @@ public class ContainerHealthTask extends ReconScmTask {
       // Remember to finish processing the last container
       if (currentContainer != null) {
         completeProcessingContainer(
-            currentContainer, existingRecords, currentTime);
+            currentContainer, existingRecords, currentTime,
+            unhealthyContainerStateCountMap);
       }
     }
     return recordCount;
   }
 
-  private void processContainer(ContainerInfo container, long currentTime) {
+  private void processContainer(ContainerInfo container, long currentTime,
+                                Map<UnHealthyContainerStates,
+                                    Map<String, Long>>
+                                      unhealthyContainerStateStatsMap) {
     try {
       Set<ContainerReplica> containerReplicas =
           containerManager.getContainerReplicas(container.containerID());
-      ContainerHealthStatus h = new ContainerHealthStatus(
-          container, containerReplicas, placementPolicy);
+      ContainerHealthStatus h = new ContainerHealthStatus(container,
+          containerReplicas, placementPolicy, reconContainerMetadataManager);
       if (h.isHealthy() || h.isDeleted()) {
         return;
       }
@@ -215,7 +294,8 @@ public class ContainerHealthTask extends ReconScmTask {
         return;
       }
       containerHealthSchemaManager.insertUnhealthyContainerRecords(
-          ContainerHealthRecords.generateUnhealthyRecords(h, currentTime));
+          ContainerHealthRecords.generateUnhealthyRecords(h, currentTime,
+              unhealthyContainerStateStatsMap));
     } catch (ContainerNotFoundException e) {
       LOG.error("Container not found while processing container in Container " 
+
           "Health task", e);
@@ -298,8 +378,11 @@ public class ContainerHealthTask extends ReconScmTask {
     }
 
     public static List<UnhealthyContainers> generateUnhealthyRecords(
-        ContainerHealthStatus container, long time) {
-      return generateUnhealthyRecords(container, new HashSet<>(), time);
+        ContainerHealthStatus container, long time,
+        Map<UnHealthyContainerStates, Map<String, Long>>
+            unhealthyContainerStateStatsMap) {
+      return generateUnhealthyRecords(container, new HashSet<>(), time,
+          unhealthyContainerStateStatsMap);
     }
 
     /**
@@ -312,7 +395,9 @@ public class ContainerHealthTask extends ReconScmTask {
      */
     public static List<UnhealthyContainers> generateUnhealthyRecords(
         ContainerHealthStatus container, Set<String> recordForStateExists,
-        long time) {
+        long time,
+        Map<UnHealthyContainerStates, Map<String, Long>>
+            unhealthyContainerStateStatsMap) {
       List<UnhealthyContainers> records = new ArrayList<>();
       if (container.isHealthy() || container.isDeleted()) {
         return records;
@@ -320,9 +405,32 @@ public class ContainerHealthTask extends ReconScmTask {
 
       if (container.isMissing()
           && !recordForStateExists.contains(
-              UnHealthyContainerStates.MISSING.toString())) {
-        records.add(
-            recordForState(container, UnHealthyContainerStates.MISSING, time));
+          UnHealthyContainerStates.MISSING.toString())) {
+        if (!container.isEmpty()) {
+          LOG.info("Non-empty container {} is missing. It has {} " +
+                  "keys and {} bytes used according to SCM metadata. " +
+                  "Please visit Recon's missing container page for a list of " 
+
+                  "keys (and their metadata) mapped to this container.",
+              container.getContainerID(), container.getNumKeys(),
+              container.getContainer().getUsedBytes());
+          records.add(
+              recordForState(container, UnHealthyContainerStates.MISSING,
+                  time));
+          populateContainerStats(container, UnHealthyContainerStates.MISSING,
+              unhealthyContainerStateStatsMap);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Empty container {} is missing. Kindly check the " +
+                "consolidated container stats per UNHEALTHY state logged as " +
+                "starting with **Container State Stats:**");
+          }
+          records.add(
+              recordForState(container, UnHealthyContainerStates.EMPTY_MISSING,
+                  time));
+          populateContainerStats(container,
+              UnHealthyContainerStates.EMPTY_MISSING,
+              unhealthyContainerStateStatsMap);
+        }
         // A container cannot have any other records if it is missing so return
         return records;
       }
@@ -332,6 +440,9 @@ public class ContainerHealthTask extends ReconScmTask {
               UnHealthyContainerStates.UNDER_REPLICATED.toString())) {
         records.add(recordForState(
             container, UnHealthyContainerStates.UNDER_REPLICATED, time));
+        populateContainerStats(container,
+            UnHealthyContainerStates.UNDER_REPLICATED,
+            unhealthyContainerStateStatsMap);
       }
 
       if (container.isOverReplicated()
@@ -339,6 +450,9 @@ public class ContainerHealthTask extends ReconScmTask {
               UnHealthyContainerStates.OVER_REPLICATED.toString())) {
         records.add(recordForState(
             container, UnHealthyContainerStates.OVER_REPLICATED, time));
+        populateContainerStats(container,
+            UnHealthyContainerStates.OVER_REPLICATED,
+            unhealthyContainerStateStatsMap);
       }
 
       if (container.isMisReplicated()
@@ -346,6 +460,9 @@ public class ContainerHealthTask extends ReconScmTask {
               UnHealthyContainerStates.MIS_REPLICATED.toString())) {
         records.add(recordForState(
             container, UnHealthyContainerStates.MIS_REPLICATED, time));
+        populateContainerStats(container,
+            UnHealthyContainerStates.MIS_REPLICATED,
+            unhealthyContainerStateStatsMap);
       }
 
       return records;
@@ -440,4 +557,24 @@ public class ContainerHealthTask extends ReconScmTask {
       }
     }
   }
+
+  private static void populateContainerStats(
+      ContainerHealthStatus container,
+      UnHealthyContainerStates unhealthyState,
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap) {
+    if (unhealthyContainerStateStatsMap.containsKey(unhealthyState)) {
+      Map<String, Long> containerStatsMap =
+          unhealthyContainerStateStatsMap.get(unhealthyState);
+      containerStatsMap.compute(CONTAINER_COUNT,
+          (containerCount, value) -> (value == null) ? 1 : (value + 1));
+      containerStatsMap.compute(TOTAL_KEYS,
+          (totalKeyCount, value) -> (value == null) ? container.getNumKeys() :
+              (value + container.getNumKeys()));
+      containerStatsMap.compute(TOTAL_USED_BYTES,
+          (totalUsedBytes, value) -> (value == null) ?
+              container.getContainer().getUsedBytes() :
+              (value + container.getContainer().getUsedBytes()));
+    }
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 1c72d990b3..464ec1a5ee 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -239,9 +239,9 @@ public class ReconStorageContainerManagerFacade
         reconTaskStatusDao,
         reconTaskConfig);
     ContainerHealthTask containerHealthTask = new ContainerHealthTask(
-        containerManager, scmServiceProvider,
-        reconTaskStatusDao, containerHealthSchemaManager,
-        containerPlacementPolicy, reconTaskConfig);
+        containerManager, scmServiceProvider, reconTaskStatusDao,
+        containerHealthSchemaManager, containerPlacementPolicy, 
reconTaskConfig,
+        reconContainerMetadataManager);
 
     this.containerSizeCountTask = new ContainerSizeCountTask(
         containerManager,
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
index 572e4d7f51..4e86b72709 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthStatus.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -46,11 +47,13 @@ public class TestContainerHealthStatus {
 
   private PlacementPolicy placementPolicy;
   private ContainerInfo container;
+  private ReconContainerMetadataManager reconContainerMetadataManager;
 
   @BeforeEach
   public void setup() {
     placementPolicy = mock(PlacementPolicy.class);
     container = mock(ContainerInfo.class);
+    reconContainerMetadataManager = mock(ReconContainerMetadataManager.class);
     when(container.getReplicationConfig())
         .thenReturn(RatisReplicationConfig
             .getInstance(HddsProtos.ReplicationFactor.THREE));
@@ -68,7 +71,8 @@ public class TestContainerHealthStatus {
         ContainerReplicaProto.State.CLOSED,
         ContainerReplicaProto.State.CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertTrue(status.isHealthy());
     assertFalse(status.isOverReplicated());
     assertFalse(status.isUnderReplicated());
@@ -91,7 +95,8 @@ public class TestContainerHealthStatus {
         ContainerReplicaProto.State.CLOSED,
         ContainerReplicaProto.State.UNHEALTHY);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertTrue(status.isHealthy());
     assertFalse(status.isOverReplicated());
     assertFalse(status.isUnderReplicated());
@@ -105,7 +110,8 @@ public class TestContainerHealthStatus {
   public void testMissingContainer() {
     Set<ContainerReplica> replicas = new HashSet<>();
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertFalse(status.isHealthy());
     assertFalse(status.isOverReplicated());
     assertFalse(status.isUnderReplicated());
@@ -120,7 +126,8 @@ public class TestContainerHealthStatus {
     Set<ContainerReplica> replicas = generateReplicas(container,
         ContainerReplicaProto.State.CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertFalse(status.isHealthy());
     assertFalse(status.isMissing());
     assertFalse(status.isOverReplicated());
@@ -138,7 +145,8 @@ public class TestContainerHealthStatus {
         ContainerReplicaProto.State.CLOSED,
         ContainerReplicaProto.State.CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertFalse(status.isHealthy());
     assertFalse(status.isMissing());
     assertFalse(status.isUnderReplicated());
@@ -158,7 +166,8 @@ public class TestContainerHealthStatus {
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     assertFalse(status.isHealthy());
     assertFalse(status.isMissing());
     assertFalse(status.isUnderReplicated());
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
index 77cd4bacce..847b5d98c7 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
@@ -49,6 +49,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
 import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
 import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import org.apache.ozone.test.LambdaTestUtils;
@@ -78,6 +79,8 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
             unHealthyContainersTableHandle);
     ReconStorageContainerManagerFacade scmMock =
         mock(ReconStorageContainerManagerFacade.class);
+    ReconContainerMetadataManager reconContainerMetadataManager =
+        mock(ReconContainerMetadataManager.class);
     MockPlacementPolicy placementMock = new MockPlacementPolicy();
     ContainerManager containerManagerMock = mock(ContainerManager.class);
     StorageContainerServiceProvider scmClientMock =
@@ -87,9 +90,10 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     ContainerReplica healthyReplicaMock = mock(ContainerReplica.class);
     when(healthyReplicaMock.getState()).thenReturn(State.CLOSED);
 
-    // Create 6 containers. The first 5 will have various unhealthy states
-    // defined below. The container with ID=6 will be healthy.
-    List<ContainerInfo> mockContainers = getMockContainers(6);
+    // Create 7 containers. The first 5 will have various unhealthy states
+    // defined below. The container with ID=6 will be healthy and
+    // container with ID=7 will be EMPTY_MISSING
+    List<ContainerInfo> mockContainers = getMockContainers(7);
     when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
     when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
     when(containerManagerMock.getContainers()).thenReturn(mockContainers);
@@ -128,6 +132,10 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
         .thenReturn(getMockReplicas(6L,
             State.CLOSED, State.CLOSED, State.CLOSED));
 
+    // return 0 replicas for container ID 7 -> EMPTY_MISSING
+    when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(7L)))
+        .thenReturn(Collections.emptySet());
+
     List<UnhealthyContainers> all = unHealthyContainersTableHandle.findAll();
     Assertions.assertTrue(all.isEmpty());
 
@@ -135,14 +143,16 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
     ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
     reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
+    when(reconContainerMetadataManager.getKeyCountForContainer(
+        7L)).thenReturn(5L);
     ContainerHealthTask containerHealthTask =
         new ContainerHealthTask(scmMock.getContainerManager(),
             scmMock.getScmServiceProvider(),
             reconTaskStatusDao, containerHealthSchemaManager,
-            placementMock, reconTaskConfig);
+            placementMock, reconTaskConfig, reconContainerMetadataManager);
     containerHealthTask.start();
     LambdaTestUtils.await(6000, 1000, () ->
-        (unHealthyContainersTableHandle.count() == 5));
+        (unHealthyContainersTableHandle.count() == 6));
     UnhealthyContainers rec =
         unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
     assertEquals("UNDER_REPLICATED", rec.getContainerState());
@@ -162,6 +172,10 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
         unhealthyContainers.get(0).getActualReplicaCount().intValue());
 
     rec = unHealthyContainersTableHandle.fetchByContainerId(3L).get(0);
+    assertEquals("EMPTY_MISSING", rec.getContainerState());
+    assertEquals(3, rec.getReplicaDelta().intValue());
+
+    rec = unHealthyContainersTableHandle.fetchByContainerId(7L).get(0);
     assertEquals("MISSING", rec.getContainerState());
     assertEquals(3, rec.getReplicaDelta().intValue());
 
@@ -205,7 +219,7 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     placementMock.setMisRepWhenDnPresent(null);
 
     LambdaTestUtils.await(6000, 1000, () ->
-        (unHealthyContainersTableHandle.count() == 3));
+        (unHealthyContainersTableHandle.count() == 4));
     rec = unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
     assertEquals("UNDER_REPLICATED", rec.getContainerState());
     assertEquals(1, rec.getReplicaDelta().intValue());
@@ -215,7 +229,10 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
         unHealthyContainersTableHandle.fetchByContainerId(2L).size());
 
     rec = unHealthyContainersTableHandle.fetchByContainerId(3L).get(0);
+    assertEquals("EMPTY_MISSING", rec.getContainerState());
+    assertEquals(3, rec.getReplicaDelta().intValue());
 
+    rec = unHealthyContainersTableHandle.fetchByContainerId(7L).get(0);
     assertEquals("MISSING", rec.getContainerState());
     assertEquals(3, rec.getReplicaDelta().intValue());
 
@@ -243,10 +260,12 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     ContainerManager containerManagerMock = mock(ContainerManager.class);
     StorageContainerServiceProvider scmClientMock =
         mock(StorageContainerServiceProvider.class);
+    ReconContainerMetadataManager reconContainerMetadataManager =
+        mock(ReconContainerMetadataManager.class);
 
     // Create 2 containers. The first is OPEN will no replicas, the second is
     // CLOSED with no replicas.
-    List<ContainerInfo> mockContainers = getMockContainers(2);
+    List<ContainerInfo> mockContainers = getMockContainers(3);
     when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock);
     when(scmMock.getContainerManager()).thenReturn(containerManagerMock);
     when(containerManagerMock.getContainers()).thenReturn(mockContainers);
@@ -255,7 +274,7 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
       when(scmClientMock.getContainerWithPipeline(c.getContainerID()))
           .thenReturn(new ContainerWithPipeline(c, null));
     }
-    // Container State OPEN with no replicas
+    // Empty Container with OPEN State and no replicas
     when(containerManagerMock.getContainer(ContainerID.valueOf(1L)).getState())
         .thenReturn(HddsProtos.LifeCycleState.OPEN);
     when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(1L)))
@@ -272,6 +291,14 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     when(scmClientMock.getContainerWithPipeline(2))
         .thenReturn(new ContainerWithPipeline(mockDeletedContainer, null));
 
+    // Container with OPEN State and no replicas
+    when(containerManagerMock.getContainer(ContainerID.valueOf(3L)).getState())
+        .thenReturn(HddsProtos.LifeCycleState.OPEN);
+    when(containerManagerMock.getContainerReplicas(ContainerID.valueOf(3L)))
+        .thenReturn(Collections.emptySet());
+    when(scmClientMock.getContainerWithPipeline(3))
+        .thenReturn(new ContainerWithPipeline(mockContainers.get(0), null));
+
     List<UnhealthyContainers> all = unHealthyContainersTableHandle.findAll();
     Assertions.assertTrue(all.isEmpty());
 
@@ -279,19 +306,26 @@ public class TestContainerHealthTask extends 
AbstractReconSqlDBTest {
     ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
     ReconTaskConfig reconTaskConfig = new ReconTaskConfig();
     reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2));
+    when(reconContainerMetadataManager.getKeyCountForContainer(
+        1L)).thenReturn(5L);
     ContainerHealthTask containerHealthTask =
         new ContainerHealthTask(scmMock.getContainerManager(),
             scmMock.getScmServiceProvider(),
             reconTaskStatusDao, containerHealthSchemaManager,
-            placementMock, reconTaskConfig);
+            placementMock, reconTaskConfig, reconContainerMetadataManager);
     containerHealthTask.start();
     LambdaTestUtils.await(6000, 1000, () ->
-        (unHealthyContainersTableHandle.count() == 1));
+        (unHealthyContainersTableHandle.count() == 2));
     UnhealthyContainers rec =
         unHealthyContainersTableHandle.fetchByContainerId(1L).get(0);
     assertEquals("MISSING", rec.getContainerState());
     assertEquals(3, rec.getReplicaDelta().intValue());
 
+    rec =
+        unHealthyContainersTableHandle.fetchByContainerId(3L).get(0);
+    assertEquals("EMPTY_MISSING", rec.getContainerState());
+    assertEquals(3, rec.getReplicaDelta().intValue());
+
     ReconTaskStatus taskStatus =
         reconTaskStatusDao.findById(containerHealthTask.getTaskName());
     Assertions.assertTrue(taskStatus.getLastUpdatedTimestamp() >
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
index 4e86ca9056..99a9961cc2 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
@@ -25,19 +25,26 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import 
org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
 import org.hadoop.ozone.recon.schema.tables.pojos.UnhealthyContainers;
 import org.hadoop.ozone.recon.schema.tables.records.UnhealthyContainersRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -50,20 +57,33 @@ import static org.mockito.Mockito.when;
  * records to store in the database.
  */
 public class TestContainerHealthTaskRecordGenerator {
-
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestContainerHealthTaskRecordGenerator.class);
   private PlacementPolicy placementPolicy;
   private ContainerInfo container;
+  private ContainerInfo emptyContainer;
+  private ReconContainerMetadataManager reconContainerMetadataManager;
 
   @BeforeEach
-  public void setup() {
+  public void setup() throws IOException {
     placementPolicy = mock(PlacementPolicy.class);
     container = mock(ContainerInfo.class);
+    emptyContainer = mock(ContainerInfo.class);
+    reconContainerMetadataManager = mock(ReconContainerMetadataManager.class);
     when(container.getReplicationConfig())
         .thenReturn(
             RatisReplicationConfig
                 .getInstance(HddsProtos.ReplicationFactor.THREE));
     when(container.containerID()).thenReturn(ContainerID.valueOf(123456));
     when(container.getContainerID()).thenReturn((long)123456);
+    when(reconContainerMetadataManager.getKeyCountForContainer(
+        (long) 123456)).thenReturn(5L);
+    when(emptyContainer.getReplicationConfig())
+        .thenReturn(
+            RatisReplicationConfig
+                .getInstance(HddsProtos.ReplicationFactor.THREE));
+    when(emptyContainer.containerID()).thenReturn(ContainerID.valueOf(345678));
+    when(emptyContainer.getContainerID()).thenReturn((long) 345678);
     when(placementPolicy.validateContainerPlacement(
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 1, 1));
@@ -73,7 +93,8 @@ public class TestContainerHealthTaskRecordGenerator {
   public void testMissingRecordRetained() {
     Set<ContainerReplica> replicas = new HashSet<>();
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     // Missing record should be retained
     assertTrue(ContainerHealthTask.ContainerHealthRecords
         .retainOrUpdateRecord(status, missingRecord()
@@ -91,7 +112,8 @@ public class TestContainerHealthTaskRecordGenerator {
         ));
 
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     assertFalse(ContainerHealthTask.ContainerHealthRecords
         .retainOrUpdateRecord(status, missingRecord()
         ));
@@ -103,7 +125,8 @@ public class TestContainerHealthTaskRecordGenerator {
     Set<ContainerReplica> replicas =
         generateReplicas(container, CLOSED, CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
 
     UnhealthyContainersRecord rec = underReplicatedRecord();
     assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -125,7 +148,8 @@ public class TestContainerHealthTaskRecordGenerator {
 
     // Container is now replicated OK - should be removed.
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     assertFalse(ContainerHealthTask.ContainerHealthRecords
         .retainOrUpdateRecord(status, rec));
   }
@@ -136,7 +160,8 @@ public class TestContainerHealthTaskRecordGenerator {
     Set<ContainerReplica> replicas =
         generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
 
     UnhealthyContainersRecord rec = overReplicatedRecord();
     assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -158,7 +183,8 @@ public class TestContainerHealthTaskRecordGenerator {
 
     // Container is now replicated OK - should be removed.
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     assertFalse(ContainerHealthTask.ContainerHealthRecords
         .retainOrUpdateRecord(status, rec));
   }
@@ -172,7 +198,8 @@ public class TestContainerHealthTaskRecordGenerator {
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(2, 3, 5));
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
 
     UnhealthyContainersRecord rec = misReplicatedRecord();
     assertTrue(ContainerHealthTask.ContainerHealthRecords
@@ -197,31 +224,58 @@ public class TestContainerHealthTaskRecordGenerator {
     when(placementPolicy.validateContainerPlacement(
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(3, 3, 5));
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     assertFalse(ContainerHealthTask.ContainerHealthRecords
         .retainOrUpdateRecord(status, rec));
   }
 
   @Test
+  @SuppressWarnings("checkstyle:methodlength")
   public void testCorrectRecordsGenerated() {
     Set<ContainerReplica> replicas =
         generateReplicas(container, CLOSED, CLOSED, CLOSED);
-
+    Map<UnHealthyContainerStates, Map<String, Long>>
+        unhealthyContainerStateStatsMap =
+        new HashMap<>();
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
     // HEALTHY container - no records generated.
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     List<UnhealthyContainers> records =
         ContainerHealthTask.ContainerHealthRecords
-            .generateUnhealthyRecords(status, (long)1234567);
+            .generateUnhealthyRecords(status, (long) 1234567,
+                unhealthyContainerStateStatsMap);
     assertEquals(0, records.size());
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
 
     // Over-replicated - expect 1 over replicated record
     replicas =
         generateReplicas(container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED);
     status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     records = ContainerHealthTask.ContainerHealthRecords
-        .generateUnhealthyRecords(status, (long)1234567);
+        .generateUnhealthyRecords(status, (long) 1234567,
+            unhealthyContainerStateStatsMap);
     assertEquals(1, records.size());
     UnhealthyContainers rec = records.get(0);
     assertEquals(UnHealthyContainerStates.OVER_REPLICATED.toString(),
@@ -229,6 +283,24 @@ public class TestContainerHealthTaskRecordGenerator {
     assertEquals(3, rec.getExpectedReplicaCount().intValue());
     assertEquals(5, rec.getActualReplicaCount().intValue());
     assertEquals(-2, rec.getReplicaDelta().intValue());
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
 
     // Under and Mis Replicated - expect 2 records - mis and under replicated
     replicas =
@@ -237,10 +309,30 @@ public class TestContainerHealthTaskRecordGenerator {
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
     status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     records = ContainerHealthTask.ContainerHealthRecords
-        .generateUnhealthyRecords(status, (long)1234567);
+        .generateUnhealthyRecords(status, (long) 1234567,
+            unhealthyContainerStateStatsMap);
     assertEquals(2, records.size());
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
 
     rec = findRecordForState(records, UnHealthyContainerStates.MIS_REPLICATED);
     assertEquals(UnHealthyContainerStates.MIS_REPLICATED.toString(),
@@ -265,51 +357,161 @@ public class TestContainerHealthTaskRecordGenerator {
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
     status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     records = ContainerHealthTask.ContainerHealthRecords
-        .generateUnhealthyRecords(status, (long)1234567);
+        .generateUnhealthyRecords(status, (long) 1234567,
+            unhealthyContainerStateStatsMap);
     assertEquals(1, records.size());
     rec = records.get(0);
     assertEquals(UnHealthyContainerStates.MISSING.toString(),
         rec.getContainerState());
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
+
+    status =
+        new ContainerHealthStatus(emptyContainer, replicas, placementPolicy,
+            reconContainerMetadataManager);
+    records = ContainerHealthTask.ContainerHealthRecords
+        .generateUnhealthyRecords(status, (long) 345678,
+            unhealthyContainerStateStatsMap);
+    assertEquals(1, records.size());
+    rec = records.get(0);
+    assertEquals(UnHealthyContainerStates.EMPTY_MISSING.toString(),
+        rec.getContainerState());
+
     assertEquals(3, rec.getExpectedReplicaCount().intValue());
     assertEquals(0, rec.getActualReplicaCount().intValue());
     assertEquals(3, rec.getReplicaDelta().intValue());
+
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    unhealthyContainerStateStatsMap.clear();
   }
 
   @Test
   public void testRecordNotGeneratedIfAlreadyExists() {
+    Map<UnHealthyContainerStates, Map<String, Long>>
+        unhealthyContainerStateStatsMap =
+        new HashMap<>();
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
     Set<String> existingRec = new HashSet<>();
-    for (UnHealthyContainerStates s : UnHealthyContainerStates.values()) {
-      existingRec.add(s.toString());
-    }
 
     // Over-replicated
     Set<ContainerReplica> replicas = generateReplicas(
         container, CLOSED, CLOSED, CLOSED, CLOSED, CLOSED);
     ContainerHealthStatus status =
-        new ContainerHealthStatus(container, replicas, placementPolicy);
+        new ContainerHealthStatus(container, replicas, placementPolicy,
+            reconContainerMetadataManager);
     List<UnhealthyContainers> records =
         ContainerHealthTask.ContainerHealthRecords
-            .generateUnhealthyRecords(status, existingRec, (long)1234567);
-    assertEquals(0, records.size());
+            .generateUnhealthyRecords(status, existingRec, (long) 1234567,
+                unhealthyContainerStateStatsMap);
+    assertEquals(1, records.size());
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+        UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
 
     // Missing
     replicas.clear();
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     records = ContainerHealthTask.ContainerHealthRecords
-        .generateUnhealthyRecords(status, existingRec, (long)1234567);
-    assertEquals(0, records.size());
+        .generateUnhealthyRecords(status, existingRec, (long) 1234567,
+            unhealthyContainerStateStatsMap);
+    assertEquals(1, records.size());
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    initializeUnhealthyContainerStateStatsMap(unhealthyContainerStateStatsMap);
 
     // Under and Mis-Replicated
     replicas = generateReplicas(container, CLOSED, CLOSED);
     when(placementPolicy.validateContainerPlacement(
         Mockito.anyList(), Mockito.anyInt()))
         .thenReturn(new ContainerPlacementStatusDefault(1, 2, 5));
-    status = new ContainerHealthStatus(container, replicas, placementPolicy);
+    status = new ContainerHealthStatus(container, replicas, placementPolicy,
+        reconContainerMetadataManager);
     records = ContainerHealthTask.ContainerHealthRecords
-        .generateUnhealthyRecords(status, existingRec, (long)1234567);
-    assertEquals(0, records.size());
+        .generateUnhealthyRecords(status, existingRec, (long) 1234567,
+            unhealthyContainerStateStatsMap);
+    assertEquals(2, records.size());
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.EMPTY_MISSING)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(0, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.OVER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.UNDER_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+    assertEquals(1, unhealthyContainerStateStatsMap.get(
+            UnHealthyContainerStates.MIS_REPLICATED)
+        .getOrDefault(CONTAINER_COUNT, 0L));
+
+    logUnhealthyContainerStats(unhealthyContainerStateStatsMap);
+    unhealthyContainerStateStatsMap.clear();
   }
 
   private UnhealthyContainers findRecordForState(
@@ -359,4 +561,41 @@ public class TestContainerHealthTaskRecordGenerator {
     return replicas;
   }
 
+  private void initializeUnhealthyContainerStateStatsMap(
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap) {
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.MISSING, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.EMPTY_MISSING, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.UNDER_REPLICATED, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.OVER_REPLICATED, new HashMap<>());
+    unhealthyContainerStateStatsMap.put(
+        UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>());
+  }
+
+  private void logUnhealthyContainerStats(
+      Map<UnHealthyContainerStates, Map<String, Long>>
+          unhealthyContainerStateStatsMap) {
+    // If any EMPTY_MISSING containers, then it is possible that such
+    // containers got stuck in the closing state which never got
+    // any replicas created on the datanodes. In this case, we log it as
+    // EMPTY, and insert as EMPTY_MISSING in UNHEALTHY_CONTAINERS table.
+    unhealthyContainerStateStatsMap.entrySet().forEach(stateEntry -> {
+      UnHealthyContainerStates unhealthyContainerState = stateEntry.getKey();
+      Map<String, Long> containerStateStatsMap = stateEntry.getValue();
+      StringBuilder logMsgBuilder =
+          new StringBuilder(unhealthyContainerState.toString());
+      logMsgBuilder.append(" Container State Stats: \n\t");
+      containerStateStatsMap.entrySet().forEach(statsEntry -> {
+        logMsgBuilder.append(statsEntry.getKey());
+        logMsgBuilder.append(" -> ");
+        logMsgBuilder.append(statsEntry.getValue());
+        logMsgBuilder.append(" , ");
+      });
+      LOG.info(logMsgBuilder.toString());
+    });
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to