siddhantsangwan commented on code in PR #7008:
URL: https://github.com/apache/ozone/pull/7008#discussion_r1803033550


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java:
##########
@@ -50,10 +58,15 @@ public class ContainerSafeModeRule extends
   // Required cutoff % for containers with at least 1 reported replica.
   private double safeModeCutoff;
   // Containers read from scm db (excluding containers in ALLOCATED state).
-  private Map<Long, ContainerInfo> containerMap;
-  private double maxContainer;
-
-  private AtomicLong containerWithMinReplicas = new AtomicLong(0);
+  private Set<Long> reportedContainerIDSet = new HashSet<>();
+  private Map<Long, ContainerInfo> ratisContainerMap;

Review Comment:
   Do we really `ratisContainerMap` and `ecContainerMap` to be maps at all? As 
far as I can see, all we need are the container IDs. We can then just use 
container manager to get the container info object when needed. There's 
probably at least a couple of GBs of overhead for storing references to 
billions of `ContainerInfo` objects in the map, which we don't really need. It 
can just be a set of container IDs.
   
   Going a step further, I feel like we don't need the List of `ContainerInfo` 
objects that's being passed into the constructor of this class. Ultimately, all 
we need is a mapping from container id to container info for all container IDs. 
So the constructor should either have that as an argument, or just the 
container manager, since the container manager can simply be used to get any 
information we need about the containers in the system.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java:
##########
@@ -71,127 +84,262 @@ public ContainerSafeModeRule(String ruleName, EventQueue 
eventQueue,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    containerMap = new ConcurrentHashMap<>();
-    containers.forEach(container -> {
-      // There can be containers in OPEN/CLOSING state which were never
-      // created by the client. We are not considering these containers for
-      // now. These containers can be handled by tracking pipelines.
-
-      Optional.ofNullable(container.getState())
-          .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
-              state == HddsProtos.LifeCycleState.CLOSED)
-              && container.getNumberOfKeys() > 0)
-          .ifPresent(s -> containerMap.put(container.getContainerID(),
-              container));
-    });
-    maxContainer = containerMap.size();
-    long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
-    
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);
+    ratisContainerMap = new ConcurrentHashMap<>();
+    ratisContainerDNsMap = new ConcurrentHashMap<>();
+    ecContainerMap = new ConcurrentHashMap<>();
+    ecContainerDNsMap = new ConcurrentHashMap<>();
 
-    LOG.info("containers with one replica threshold count {}", cutOff);
+    initializeRule(containers);
   }
 
 
   @Override
   protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
-    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
+    return SCMEvents.CONTAINER_REGISTRATION_REPORT;
   }
 
 
   @Override
   protected synchronized boolean validate() {
-    return getCurrentContainerThreshold() >= safeModeCutoff;
+    return (getCurrentContainerThreshold() >= safeModeCutoff) &&
+        (getCurrentECContainerThreshold() >= safeModeCutoff);
   }
 
   @VisibleForTesting
   public synchronized double getCurrentContainerThreshold() {
-    if (maxContainer == 0) {
+    if (ratisMaxContainer == 0) {
       return 1;
     }
-    return (containerWithMinReplicas.doubleValue() / maxContainer);
+    return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
+  }
+
+  @VisibleForTesting
+  public synchronized double getCurrentECContainerThreshold() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
+  }
+
+  public synchronized double getEcMaxContainer() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return ecMaxContainer;
+  }
+
+  private synchronized double getRatisMaxContainer() {
+    if (ratisMaxContainer == 0) {
+      return 1;
+    }
+    return ratisMaxContainer;
   }
 
   @Override
   protected synchronized void process(
       NodeRegistrationContainerReport reportsProto) {
+    DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
+    UUID datanodeUUID = datanodeDetails.getUuid();
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = 
reportsProto.getReport();
 
-    reportsProto.getReport().getReportsList().forEach(c -> {
-      if (containerMap.containsKey(c.getContainerID())) {
-        if (containerMap.remove(c.getContainerID()) != null) {
-          containerWithMinReplicas.getAndAdd(1);
-          getSafeModeMetrics()
-              .incCurrentContainersWithOneReplicaReportedCount();
-        }
+    report.getReportsList().forEach(c -> {
+      long containerID = c.getContainerID();
+
+      // If it is a Ratis container.
+      if (ratisContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ratisContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.FALSE);
+      }
+
+      // If it is a EC container.
+      if (ecContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ecContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.TRUE);
       }
     });
 
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. {} % containers have at least one"
-              + " reported replica.",
-          (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
+          "SCM in safe mode. {} % containers [Ratis] have at least one"
+          + " reported replica, {} % containers [EC] have at N reported 
replica.",
+          ((ratisContainerWithMinReplicas.doubleValue() / 
getRatisMaxContainer()) * 100),
+          ((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 
100)
+      );
+    }
+  }
+
+  /**
+   * Record the reported Container.
+   *
+   * We will differentiate and count according to the type of Container.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   */
+  private void recordReportedContainer(long containerID, boolean 
isEcContainer) {
+    if (!reportedContainerIDSet.contains(containerID)) {

Review Comment:
   Is it possible to get rid of `reportedContainerIDSet` and just use the ratis 
and ec maps?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java:
##########
@@ -71,127 +84,262 @@ public ContainerSafeModeRule(String ruleName, EventQueue 
eventQueue,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    containerMap = new ConcurrentHashMap<>();
-    containers.forEach(container -> {
-      // There can be containers in OPEN/CLOSING state which were never
-      // created by the client. We are not considering these containers for
-      // now. These containers can be handled by tracking pipelines.
-
-      Optional.ofNullable(container.getState())
-          .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
-              state == HddsProtos.LifeCycleState.CLOSED)
-              && container.getNumberOfKeys() > 0)
-          .ifPresent(s -> containerMap.put(container.getContainerID(),
-              container));
-    });
-    maxContainer = containerMap.size();
-    long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
-    
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);
+    ratisContainerMap = new ConcurrentHashMap<>();
+    ratisContainerDNsMap = new ConcurrentHashMap<>();
+    ecContainerMap = new ConcurrentHashMap<>();
+    ecContainerDNsMap = new ConcurrentHashMap<>();
 
-    LOG.info("containers with one replica threshold count {}", cutOff);
+    initializeRule(containers);
   }
 
 
   @Override
   protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
-    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
+    return SCMEvents.CONTAINER_REGISTRATION_REPORT;
   }
 
 
   @Override
   protected synchronized boolean validate() {
-    return getCurrentContainerThreshold() >= safeModeCutoff;
+    return (getCurrentContainerThreshold() >= safeModeCutoff) &&
+        (getCurrentECContainerThreshold() >= safeModeCutoff);
   }
 
   @VisibleForTesting
   public synchronized double getCurrentContainerThreshold() {
-    if (maxContainer == 0) {
+    if (ratisMaxContainer == 0) {
       return 1;
     }
-    return (containerWithMinReplicas.doubleValue() / maxContainer);
+    return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
+  }
+
+  @VisibleForTesting
+  public synchronized double getCurrentECContainerThreshold() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
+  }
+
+  public synchronized double getEcMaxContainer() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return ecMaxContainer;
+  }
+
+  private synchronized double getRatisMaxContainer() {
+    if (ratisMaxContainer == 0) {
+      return 1;
+    }
+    return ratisMaxContainer;
   }
 
   @Override
   protected synchronized void process(
       NodeRegistrationContainerReport reportsProto) {
+    DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
+    UUID datanodeUUID = datanodeDetails.getUuid();
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = 
reportsProto.getReport();
 
-    reportsProto.getReport().getReportsList().forEach(c -> {
-      if (containerMap.containsKey(c.getContainerID())) {
-        if (containerMap.remove(c.getContainerID()) != null) {
-          containerWithMinReplicas.getAndAdd(1);
-          getSafeModeMetrics()
-              .incCurrentContainersWithOneReplicaReportedCount();
-        }
+    report.getReportsList().forEach(c -> {
+      long containerID = c.getContainerID();
+
+      // If it is a Ratis container.
+      if (ratisContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ratisContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.FALSE);
+      }
+
+      // If it is a EC container.
+      if (ecContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ecContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.TRUE);
       }
     });
 
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. {} % containers have at least one"
-              + " reported replica.",
-          (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
+          "SCM in safe mode. {} % containers [Ratis] have at least one"
+          + " reported replica, {} % containers [EC] have at N reported 
replica.",
+          ((ratisContainerWithMinReplicas.doubleValue() / 
getRatisMaxContainer()) * 100),
+          ((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 
100)
+      );
+    }
+  }
+
+  /**
+   * Record the reported Container.
+   *
+   * We will differentiate and count according to the type of Container.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   */
+  private void recordReportedContainer(long containerID, boolean 
isEcContainer) {
+    if (!reportedContainerIDSet.contains(containerID)) {
+      Set<UUID> uuids = isEcContainer ? ecContainerDNsMap.get(containerID) :
+          ratisContainerDNsMap.get(containerID);
+      int minReplica = getMinReplica(containerID, isEcContainer);
+      if (uuids != null && uuids.size() >= minReplica) {
+        reportedContainerIDSet.add(containerID);
+        if (isEcContainer) {
+          getSafeModeMetrics()
+              .incCurrentContainersWithECDataReplicaReportedCount();
+          ecContainerWithMinReplicas.getAndAdd(1);
+        } else {
+          ratisContainerWithMinReplicas.getAndAdd(1);
+          getSafeModeMetrics()
+              .incCurrentContainersWithOneReplicaReportedCount();
+        }
+      }
     }
   }
 
+  /**
+   * Get the minimum replica.
+   *
+   * If it is a Ratis Contianer, the minimum copy is 1.
+   * If it is an EC Container, the minimum copy will be the number of Data in 
replicationConfig.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   * @return MinReplica.
+   */
+  private int getMinReplica(long containerID, boolean isEcContainer) {
+    if (isEcContainer) {
+      ContainerInfo containerInfo = ecContainerMap.get(containerID);
+      if (containerInfo != null) {
+        ReplicationConfig replicationConfig = 
containerInfo.getReplicationConfig();
+        if (replicationConfig != null && replicationConfig instanceof 
ECReplicationConfig) {
+          ECReplicationConfig ecReplicationConfig = (ECReplicationConfig) 
replicationConfig;
+          return ecReplicationConfig.getData();
+        }
+      }
+    }
+    return 1;
+  }
+
+  private void initContainerDNsMap(long containerID, Map<Long, Set<UUID>> 
containerDNsMap,
+      UUID datanodeUUID) {
+    containerDNsMap.computeIfAbsent(containerID, key -> Sets.newHashSet());
+    containerDNsMap.get(containerID).add(datanodeUUID);
+  }
+
   @Override
   protected synchronized void cleanup() {
-    containerMap.clear();
+    ratisContainerMap.clear();
+    ratisContainerDNsMap.clear();
+    ecContainerMap.clear();
+    ecContainerDNsMap.clear();
+    reportedContainerIDSet.clear();
   }
 
   @Override
   public String getStatusText() {
-    List<Long> sampleContainers = containerMap.keySet()
-        .stream()
-        .limit(SAMPLE_CONTAINER_DISPLAY_LIMIT)
-        .collect(Collectors.toList());
 
-    String status = String.format("%% of containers with at least one reported"
-            + " replica (=%1.2f) >= safeModeCutoff (=%1.2f)",
+    // ratis container
+    String status = String.format(
+        "%1.2f%% of [Ratis] Containers(%s / %s) with at least one reported 
replica (=%1.2f) >= " +
+        "safeModeCutoff (=%1.2f);",
+        (ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) 
* 100,
+        ratisContainerWithMinReplicas, (long) getRatisMaxContainer(),
         getCurrentContainerThreshold(), this.safeModeCutoff);
 
-    if (!sampleContainers.isEmpty()) {
+    Set<Long> sampleRatisContainers = ratisContainerDNsMap.entrySet().stream().
+        filter(entry -> entry.getValue().isEmpty()).
+        map(Map.Entry::getKey).
+        limit(SAMPLE_CONTAINER_DISPLAY_LIMIT).
+        collect(Collectors.toSet());
+
+    if (!sampleRatisContainers.isEmpty()) {
       String sampleContainerText =
-          "Sample containers not satisfying the criteria : " + 
sampleContainers;
+          "Sample Ratis Containers not satisfying the criteria : " + 
sampleRatisContainers + ";";
       status = status.concat("\n").concat(sampleContainerText);
     }
 
+    // ec container
+    String ecStatus = String.format(
+        "%1.2f%% of [EC] Containers(%s / %s) with at least N reported replica 
(=%1.2f) >= " +
+        "safeModeCutoff (=%1.2f);",
+        (ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100,
+        ecContainerWithMinReplicas, (long) getEcMaxContainer(),
+        getCurrentECContainerThreshold(), this.safeModeCutoff);
+    status = status.concat("\n").concat(ecStatus);
+
+    Set<Long> sampleEcContainers = ecContainerDNsMap.entrySet().stream().
+        filter(entry -> {
+          Long containerId = entry.getKey();
+          int minReplica = getMinReplica(containerId, Boolean.TRUE);
+          Set<UUID> allReplicas = entry.getValue();
+          if (allReplicas.size() >= minReplica) {
+            return false;
+          }
+          return true;
+        }).
+        map(Map.Entry::getKey).
+        limit(SAMPLE_CONTAINER_DISPLAY_LIMIT).
+        collect(Collectors.toSet());
+
+    if (!sampleEcContainers.isEmpty()) {
+      String sampleECContainerText =
+          "Sample EC Containers not satisfying the criteria : " + 
sampleEcContainers + ";";
+      status = status.concat("\n").concat(sampleECContainerText);
+    }
+
     return status;
   }
 
 
   @Override
   public synchronized void refresh(boolean forceRefresh) {
+    List<ContainerInfo> containers = containerManager.getContainers();
     if (forceRefresh) {
-      reInitializeRule();
+      initializeRule(containers);
     } else {
       if (!validate()) {
-        reInitializeRule();
+        initializeRule(containers);
       }
     }
   }
 
-  private void reInitializeRule() {
-    containerMap.clear();
-    containerManager.getContainers().forEach(container -> {
+  private boolean checkContainerState(LifeCycleState state) {
+    if (state == LifeCycleState.QUASI_CLOSED || state == 
LifeCycleState.CLOSED) {
+      return true;
+    }
+    return false;
+  }
+
+  private void initializeRule(List<ContainerInfo> containers) {
+
+    containers.forEach(container -> {
       // There can be containers in OPEN/CLOSING state which were never
       // created by the client. We are not considering these containers for
       // now. These containers can be handled by tracking pipelines.
 
-      Optional.ofNullable(container.getState())
-          .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
-              state == HddsProtos.LifeCycleState.CLOSED)
-              && container.getNumberOfKeys() > 0)
-          .ifPresent(s -> containerMap.put(container.getContainerID(),
-              container));
+      LifeCycleState containerState = container.getState();
+      ReplicationConfig replicationConfig = container.getReplicationConfig();
+
+      if (checkContainerState(containerState) && container.getNumberOfKeys() > 
0) {
+        if (replicationConfig instanceof RatisReplicationConfig) {

Review Comment:
   It's more intuitive to do something like:
   ```
   container.getReplicationType().equals(HddsProtos.ReplicationType.RATIS)
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java:
##########
@@ -71,127 +84,262 @@ public ContainerSafeModeRule(String ruleName, EventQueue 
eventQueue,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    containerMap = new ConcurrentHashMap<>();
-    containers.forEach(container -> {
-      // There can be containers in OPEN/CLOSING state which were never
-      // created by the client. We are not considering these containers for
-      // now. These containers can be handled by tracking pipelines.
-
-      Optional.ofNullable(container.getState())
-          .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
-              state == HddsProtos.LifeCycleState.CLOSED)
-              && container.getNumberOfKeys() > 0)
-          .ifPresent(s -> containerMap.put(container.getContainerID(),
-              container));
-    });
-    maxContainer = containerMap.size();
-    long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
-    
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);
+    ratisContainerMap = new ConcurrentHashMap<>();
+    ratisContainerDNsMap = new ConcurrentHashMap<>();
+    ecContainerMap = new ConcurrentHashMap<>();
+    ecContainerDNsMap = new ConcurrentHashMap<>();
 
-    LOG.info("containers with one replica threshold count {}", cutOff);
+    initializeRule(containers);
   }
 
 
   @Override
   protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
-    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
+    return SCMEvents.CONTAINER_REGISTRATION_REPORT;
   }
 
 
   @Override
   protected synchronized boolean validate() {
-    return getCurrentContainerThreshold() >= safeModeCutoff;
+    return (getCurrentContainerThreshold() >= safeModeCutoff) &&
+        (getCurrentECContainerThreshold() >= safeModeCutoff);
   }
 
   @VisibleForTesting
   public synchronized double getCurrentContainerThreshold() {
-    if (maxContainer == 0) {
+    if (ratisMaxContainer == 0) {
       return 1;
     }
-    return (containerWithMinReplicas.doubleValue() / maxContainer);
+    return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
+  }
+
+  @VisibleForTesting
+  public synchronized double getCurrentECContainerThreshold() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
+  }
+
+  public synchronized double getEcMaxContainer() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return ecMaxContainer;
+  }
+
+  private synchronized double getRatisMaxContainer() {
+    if (ratisMaxContainer == 0) {
+      return 1;
+    }
+    return ratisMaxContainer;
   }
 
   @Override
   protected synchronized void process(
       NodeRegistrationContainerReport reportsProto) {
+    DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
+    UUID datanodeUUID = datanodeDetails.getUuid();
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = 
reportsProto.getReport();
 
-    reportsProto.getReport().getReportsList().forEach(c -> {
-      if (containerMap.containsKey(c.getContainerID())) {
-        if (containerMap.remove(c.getContainerID()) != null) {
-          containerWithMinReplicas.getAndAdd(1);
-          getSafeModeMetrics()
-              .incCurrentContainersWithOneReplicaReportedCount();
-        }
+    report.getReportsList().forEach(c -> {
+      long containerID = c.getContainerID();
+
+      // If it is a Ratis container.
+      if (ratisContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ratisContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.FALSE);
+      }
+
+      // If it is a EC container.
+      if (ecContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ecContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.TRUE);
       }
     });
 
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. {} % containers have at least one"
-              + " reported replica.",
-          (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
+          "SCM in safe mode. {} % containers [Ratis] have at least one"
+          + " reported replica, {} % containers [EC] have at N reported 
replica.",
+          ((ratisContainerWithMinReplicas.doubleValue() / 
getRatisMaxContainer()) * 100),
+          ((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 
100)
+      );
+    }
+  }
+
+  /**
+   * Record the reported Container.
+   *
+   * We will differentiate and count according to the type of Container.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   */
+  private void recordReportedContainer(long containerID, boolean 
isEcContainer) {
+    if (!reportedContainerIDSet.contains(containerID)) {
+      Set<UUID> uuids = isEcContainer ? ecContainerDNsMap.get(containerID) :
+          ratisContainerDNsMap.get(containerID);
+      int minReplica = getMinReplica(containerID, isEcContainer);
+      if (uuids != null && uuids.size() >= minReplica) {
+        reportedContainerIDSet.add(containerID);
+        if (isEcContainer) {
+          getSafeModeMetrics()
+              .incCurrentContainersWithECDataReplicaReportedCount();
+          ecContainerWithMinReplicas.getAndAdd(1);
+        } else {
+          ratisContainerWithMinReplicas.getAndAdd(1);
+          getSafeModeMetrics()
+              .incCurrentContainersWithOneReplicaReportedCount();
+        }
+      }
     }
   }
 
+  /**
+   * Get the minimum replica.
+   *
+   * If it is a Ratis Contianer, the minimum copy is 1.
+   * If it is an EC Container, the minimum copy will be the number of Data in 
replicationConfig.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   * @return MinReplica.
+   */
+  private int getMinReplica(long containerID, boolean isEcContainer) {
+    if (isEcContainer) {
+      ContainerInfo containerInfo = ecContainerMap.get(containerID);
+      if (containerInfo != null) {
+        ReplicationConfig replicationConfig = 
containerInfo.getReplicationConfig();
+        if (replicationConfig != null && replicationConfig instanceof 
ECReplicationConfig) {
+          ECReplicationConfig ecReplicationConfig = (ECReplicationConfig) 
replicationConfig;
+          return ecReplicationConfig.getData();
+        }
+      }
+    }
+    return 1;
+  }
+
+  private void initContainerDNsMap(long containerID, Map<Long, Set<UUID>> 
containerDNsMap,

Review Comment:
   How about renaming this to `putInContainerDNsMap`?
   



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/ContainerSafeModeRule.java:
##########
@@ -71,127 +84,262 @@ public ContainerSafeModeRule(String ruleName, EventQueue 
eventQueue,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    containerMap = new ConcurrentHashMap<>();
-    containers.forEach(container -> {
-      // There can be containers in OPEN/CLOSING state which were never
-      // created by the client. We are not considering these containers for
-      // now. These containers can be handled by tracking pipelines.
-
-      Optional.ofNullable(container.getState())
-          .filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
-              state == HddsProtos.LifeCycleState.CLOSED)
-              && container.getNumberOfKeys() > 0)
-          .ifPresent(s -> containerMap.put(container.getContainerID(),
-              container));
-    });
-    maxContainer = containerMap.size();
-    long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
-    
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);
+    ratisContainerMap = new ConcurrentHashMap<>();
+    ratisContainerDNsMap = new ConcurrentHashMap<>();
+    ecContainerMap = new ConcurrentHashMap<>();
+    ecContainerDNsMap = new ConcurrentHashMap<>();
 
-    LOG.info("containers with one replica threshold count {}", cutOff);
+    initializeRule(containers);
   }
 
 
   @Override
   protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
-    return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
+    return SCMEvents.CONTAINER_REGISTRATION_REPORT;
   }
 
 
   @Override
   protected synchronized boolean validate() {
-    return getCurrentContainerThreshold() >= safeModeCutoff;
+    return (getCurrentContainerThreshold() >= safeModeCutoff) &&
+        (getCurrentECContainerThreshold() >= safeModeCutoff);
   }
 
   @VisibleForTesting
   public synchronized double getCurrentContainerThreshold() {
-    if (maxContainer == 0) {
+    if (ratisMaxContainer == 0) {
       return 1;
     }
-    return (containerWithMinReplicas.doubleValue() / maxContainer);
+    return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
+  }
+
+  @VisibleForTesting
+  public synchronized double getCurrentECContainerThreshold() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
+  }
+
+  public synchronized double getEcMaxContainer() {
+    if (ecMaxContainer == 0) {
+      return 1;
+    }
+    return ecMaxContainer;
+  }
+
+  private synchronized double getRatisMaxContainer() {
+    if (ratisMaxContainer == 0) {
+      return 1;
+    }
+    return ratisMaxContainer;
   }
 
   @Override
   protected synchronized void process(
       NodeRegistrationContainerReport reportsProto) {
+    DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
+    UUID datanodeUUID = datanodeDetails.getUuid();
+    StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = 
reportsProto.getReport();
 
-    reportsProto.getReport().getReportsList().forEach(c -> {
-      if (containerMap.containsKey(c.getContainerID())) {
-        if (containerMap.remove(c.getContainerID()) != null) {
-          containerWithMinReplicas.getAndAdd(1);
-          getSafeModeMetrics()
-              .incCurrentContainersWithOneReplicaReportedCount();
-        }
+    report.getReportsList().forEach(c -> {
+      long containerID = c.getContainerID();
+
+      // If it is a Ratis container.
+      if (ratisContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ratisContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.FALSE);
+      }
+
+      // If it is a EC container.
+      if (ecContainerMap.containsKey(containerID)) {
+        initContainerDNsMap(containerID, ecContainerDNsMap, datanodeUUID);
+        recordReportedContainer(containerID, Boolean.TRUE);
       }
     });
 
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. {} % containers have at least one"
-              + " reported replica.",
-          (containerWithMinReplicas.doubleValue() / maxContainer) * 100);
+          "SCM in safe mode. {} % containers [Ratis] have at least one"
+          + " reported replica, {} % containers [EC] have at N reported 
replica.",
+          ((ratisContainerWithMinReplicas.doubleValue() / 
getRatisMaxContainer()) * 100),
+          ((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 
100)
+      );
+    }
+  }
+
+  /**
+   * Record the reported Container.
+   *
+   * We will differentiate and count according to the type of Container.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   */
+  private void recordReportedContainer(long containerID, boolean 
isEcContainer) {
+    if (!reportedContainerIDSet.contains(containerID)) {
+      Set<UUID> uuids = isEcContainer ? ecContainerDNsMap.get(containerID) :
+          ratisContainerDNsMap.get(containerID);
+      int minReplica = getMinReplica(containerID, isEcContainer);
+      if (uuids != null && uuids.size() >= minReplica) {
+        reportedContainerIDSet.add(containerID);
+        if (isEcContainer) {
+          getSafeModeMetrics()
+              .incCurrentContainersWithECDataReplicaReportedCount();
+          ecContainerWithMinReplicas.getAndAdd(1);
+        } else {
+          ratisContainerWithMinReplicas.getAndAdd(1);
+          getSafeModeMetrics()
+              .incCurrentContainersWithOneReplicaReportedCount();
+        }
+      }
     }
   }
 
+  /**
+   * Get the minimum replica.
+   *
+   * If it is a Ratis Contianer, the minimum copy is 1.
+   * If it is an EC Container, the minimum copy will be the number of Data in 
replicationConfig.
+   *
+   * @param containerID containerID
+   * @param isEcContainer true, means ECContainer, false, means not 
ECContainer.
+   * @return MinReplica.
+   */
+  private int getMinReplica(long containerID, boolean isEcContainer) {
+    if (isEcContainer) {
+      ContainerInfo containerInfo = ecContainerMap.get(containerID);
+      if (containerInfo != null) {
+        ReplicationConfig replicationConfig = 
containerInfo.getReplicationConfig();
+        if (replicationConfig != null && replicationConfig instanceof 
ECReplicationConfig) {
+          ECReplicationConfig ecReplicationConfig = (ECReplicationConfig) 
replicationConfig;
+          return ecReplicationConfig.getData();
+        }
+      }
+    }
+    return 1;
+  }
+
+  private void initContainerDNsMap(long containerID, Map<Long, Set<UUID>> 
containerDNsMap,
+      UUID datanodeUUID) {
+    containerDNsMap.computeIfAbsent(containerID, key -> Sets.newHashSet());
+    containerDNsMap.get(containerID).add(datanodeUUID);
+  }
+
   @Override
   protected synchronized void cleanup() {
-    containerMap.clear();
+    ratisContainerMap.clear();
+    ratisContainerDNsMap.clear();
+    ecContainerMap.clear();
+    ecContainerDNsMap.clear();
+    reportedContainerIDSet.clear();
   }
 
   @Override
   public String getStatusText() {
-    List<Long> sampleContainers = containerMap.keySet()
-        .stream()
-        .limit(SAMPLE_CONTAINER_DISPLAY_LIMIT)
-        .collect(Collectors.toList());
 
-    String status = String.format("%% of containers with at least one reported"
-            + " replica (=%1.2f) >= safeModeCutoff (=%1.2f)",
+    // ratis container
+    String status = String.format(
+        "%1.2f%% of [Ratis] Containers(%s / %s) with at least one reported 
replica (=%1.2f) >= " +
+        "safeModeCutoff (=%1.2f);",
+        (ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) 
* 100,
+        ratisContainerWithMinReplicas, (long) getRatisMaxContainer(),
         getCurrentContainerThreshold(), this.safeModeCutoff);
 
-    if (!sampleContainers.isEmpty()) {
+    Set<Long> sampleRatisContainers = ratisContainerDNsMap.entrySet().stream().
+        filter(entry -> entry.getValue().isEmpty()).
+        map(Map.Entry::getKey).
+        limit(SAMPLE_CONTAINER_DISPLAY_LIMIT).
+        collect(Collectors.toSet());
+
+    if (!sampleRatisContainers.isEmpty()) {
       String sampleContainerText =
-          "Sample containers not satisfying the criteria : " + 
sampleContainers;
+          "Sample Ratis Containers not satisfying the criteria : " + 
sampleRatisContainers + ";";
       status = status.concat("\n").concat(sampleContainerText);
     }
 
+    // ec container
+    String ecStatus = String.format(
+        "%1.2f%% of [EC] Containers(%s / %s) with at least N reported replica 
(=%1.2f) >= " +
+        "safeModeCutoff (=%1.2f);",
+        (ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100,
+        ecContainerWithMinReplicas, (long) getEcMaxContainer(),
+        getCurrentECContainerThreshold(), this.safeModeCutoff);
+    status = status.concat("\n").concat(ecStatus);
+
+    Set<Long> sampleEcContainers = ecContainerDNsMap.entrySet().stream().
+        filter(entry -> {
+          Long containerId = entry.getKey();
+          int minReplica = getMinReplica(containerId, Boolean.TRUE);
+          Set<UUID> allReplicas = entry.getValue();
+          if (allReplicas.size() >= minReplica) {
+            return false;
+          }
+          return true;
+        }).
+        map(Map.Entry::getKey).
+        limit(SAMPLE_CONTAINER_DISPLAY_LIMIT).
+        collect(Collectors.toSet());
+
+    if (!sampleEcContainers.isEmpty()) {
+      String sampleECContainerText =
+          "Sample EC Containers not satisfying the criteria : " + 
sampleEcContainers + ";";
+      status = status.concat("\n").concat(sampleECContainerText);
+    }
+
     return status;
   }
 
 
   @Override
   public synchronized void refresh(boolean forceRefresh) {
+    List<ContainerInfo> containers = containerManager.getContainers();
     if (forceRefresh) {
-      reInitializeRule();
+      initializeRule(containers);
     } else {
       if (!validate()) {
-        reInitializeRule();
+        initializeRule(containers);
       }
     }
   }
 
-  private void reInitializeRule() {
-    containerMap.clear();

Review Comment:
   This method was also clearing this map but the new code isn't; can you check 
if we need to clear the map? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to