adoroszlai commented on code in PR #4510:
URL: https://github.com/apache/ozone/pull/4510#discussion_r1155117437


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -147,6 +148,16 @@ public class ReplicationManager implements SCMService {
    */
   private LegacyReplicationManager legacyReplicationManager;
 
+  /**
+   * Set of nodes which have been excluded for replication commands due to the
+   * number of commands queued on a datanode. This can be used when generating
+   * reconstruction commands to avoid nodes which are already overloaded. When
+   * the datanode heartbeat is received, the node is removed from this set if
+   * the command count has dropped below the limit.
+   */
+  private final Map<DatanodeDetails, Integer> excludedNodes =

Review Comment:
   In a future patch I think we can change to `Boolean` value.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -804,15 +833,45 @@ public long getScmTerm() throws NotLeaderException {
    * Notify ReplicationManager that the command counts on a datanode have been
    * updated via a heartbeat received. This will allow RM to consider the node
    * for container operations if it was previously excluded due to load.
-   * @param datanodeDetails The datanode for which the commands have been
-   *                        updated.
+   * @param datanode The datanode for which the commands have been updated.
    */
-  public void datanodeCommandCountUpdated(DatanodeDetails datanodeDetails) {
-    // For now this is a NOOP, as the plan is to use this notification in a
-    // future change to limit the number of commands scheduled against a DN by
-    // RM.
+  public void datanodeCommandCountUpdated(DatanodeDetails datanode) {
     LOG.debug("Received a notification that the DN command count " +
-        "has been updated for {}", datanodeDetails);
+        "has been updated for {}", datanode);
+    excludedNodes.compute(datanode, (k, v) -> {
+      if (v != null) {
+        // There is an existing mapping, so we may need to remove it
+        try {
+          if (getDatanodeQueuedReplicationCount(datanode)
+              < datanodeReplicationLimit) {
+            // Returning null removes the entry from the map
+            return null;
+          } else {
+            return 1;
+          }
+        } catch (NodeNotFoundException e) {
+          LOG.warn("Unable to find datanode {} in nodeManager. " +
+              "Should not happen.", datanode);
+          return null;
+        }
+      }
+      // If there is no existing mapping, then return null as we don't want to
+      // add a mapping.
+      return null;
+    });

Review Comment:
   Can be simplified a bit by using `computeIfPresent` instead of `compute`:
   
   ```suggestion
       // If there is an existing mapping, we may need to remove it
       excludedNodes.computeIfPresent(datanode, (k, v) -> {
         try {
           if (getDatanodeQueuedReplicationCount(datanode)
               < datanodeReplicationLimit) {
             // Returning null removes the entry from the map
             return null;
           } else {
             return 1;
           }
         } catch (NodeNotFoundException e) {
           LOG.warn("Unable to find datanode {} in nodeManager. " +
               "Should not happen.", datanode);
           return null;
         }
       });
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -567,18 +592,12 @@ public void 
sendThrottledReconstructionCommand(ContainerInfo containerInfo,
         = new ArrayList<>();
     for (DatanodeDetails dn : datanodes) {
       try {
-        Map<Type, Integer> counts = nodeManager.getTotalDatanodeCommandCounts(
-            dn, Type.replicateContainerCommand,
-            Type.reconstructECContainersCommand);
-        int replicateCount = counts.get(Type.replicateContainerCommand);
-        int reconstructCount = counts.get(Type.reconstructECContainersCommand);
-        int totalCount = replicateCount
-            + reconstructCount * reconstructionCommandWeight;
+        int totalCount = getDatanodeQueuedReplicationCount(dn);
         if (totalCount >= datanodeReplicationLimit) {
           LOG.debug("Datanode {} has reached the maximum number of queued " +
-              "commands, replication: {}, reconstruction: {} * {})",
-              dn, replicateCount, reconstructCount,
-              reconstructionCommandWeight);
+              "commands, replication + reconstruction * {}: {})",
+              reconstructionCommandWeight, dn, totalCount);

Review Comment:
   ```suggestion
                 dn, reconstructionCommandWeight, totalCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to