minni31 commented on a change in pull request #3760:
URL: https://github.com/apache/hadoop/pull/3760#discussion_r795687281



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
     return headroomWeighting;
   }
 
+  /**
+   * When certain subcluster is too loaded, reroute Node requests going there.
+   */
+  protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+      int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+    // If targetId is not in the active and enabled SC list, reroute the 
traffic
+    if (activeAndEnabledSCs.contains(targetId)) {
+      int targetPendingCount = getSubClusterLoad(targetId);
+      if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+        return targetId;
+      }
+    }
+    SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+        activeAndEnabledSCs);
+    return scId;
+  }
+
+  private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
+      int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+    ArrayList<Float> weights = new ArrayList<>();
+    ArrayList<SubClusterId> scIds = new ArrayList<>();
+    int targetLoad = getSubClusterLoad(targetId);
+    if (targetLoad == -1) {
+      // Probably a SC that's not active and enabled. Forcing a reroute
+      targetLoad = Integer.MAX_VALUE;
+    }
+
+    for (SubClusterId sc : activeAndEnabledSCs) {
+      int scLoad = getSubClusterLoad(sc);
+      if (scLoad > targetLoad) {
+        // Never mind if it is not the most loaded SC
+        return targetId;
+      }
+
+      /*
+       * Prepare the weights for a random draw among all known SCs.
+       *
+       * For SC with pending bigger than maxThreshold / 2, use maxThreshold /
+       * pending as weight. We multiplied by maxThreshold so that the weights
+       * won't be too small in value.
+       *
+       * For SC with pending less than maxThreshold / 2, we cap the weight at 2
+       * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending
+       * will not get a huge weight and thus get swamped.
+       */
+      if (scLoad <= maxThreshold / 2) {
+        weights.add(2f);
+      } else {
+        weights.add((float) maxThreshold / scLoad);
+      }
+      scIds.add(sc);
+    }
+    if (weights.size() == 0) {
+      return targetId;
+    }
+    return scIds.get(FederationPolicyUtils.getWeightedRandom(weights));
+  }
+
+  private int getSubClusterLoad(SubClusterId subClusterId) {
+    Resource r = this.headroom.get(subClusterId);
+    if (r == null) {
+      return -1;
+    }
+    return Integer.MAX_VALUE - r.getMemory();

Review comment:
       Code refactored, this is not present anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to