goiri commented on a change in pull request #3760:
URL: https://github.com/apache/hadoop/pull/3760#discussion_r764213666
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -232,6 +238,19 @@ public void notifyOfResponse(SubClusterId subClusterId,
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
+
+ // If needed, re-reroute node requests base on SC load
+ // Read from config every time so that it is SCDable
Review comment:
Replace SCDable
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
+ protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ // If targetId is not in the active and enabled SC list, reroute the
traffic
+ if (activeAndEnabledSCs.contains(targetId)) {
+ int targetPendingCount = getSubClusterLoad(targetId);
+ if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+ return targetId;
+ }
+ }
+ SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+ activeAndEnabledSCs);
+ return scId;
+ }
+
+ private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
Review comment:
I think we usually use "choose" instead of "pick"
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
+ protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ // If targetId is not in the active and enabled SC list, reroute the
traffic
+ if (activeAndEnabledSCs.contains(targetId)) {
+ int targetPendingCount = getSubClusterLoad(targetId);
+ if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+ return targetId;
+ }
+ }
+ SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+ activeAndEnabledSCs);
+ return scId;
+ }
+
+ private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ ArrayList<Float> weights = new ArrayList<>();
+ ArrayList<SubClusterId> scIds = new ArrayList<>();
+ int targetLoad = getSubClusterLoad(targetId);
+ if (targetLoad == -1) {
+ // Probably a SC that's not active and enabled. Forcing a reroute
+ targetLoad = Integer.MAX_VALUE;
Review comment:
Not very common to use MAX_VALUE as an error; do we have a cleaner one?
Return -1 does not work?
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
##########
@@ -795,6 +795,100 @@ public void testSubClusterExpiry() throws Exception {
checkTotalContainerAllocation(response, 100);
}
+ @Test
+ public void testPickSubClusterIdForMaxLoadSC() throws YarnException {
+ int pendingThreshold = 1000;
+
+ LocalityMulticastAMRMProxyPolicy policy =
+ (LocalityMulticastAMRMProxyPolicy) getPolicy();
+ initializePolicy();
+
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc0 = SubClusterId.newInstance("0");
+ Resource r0 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc1 = SubClusterId.newInstance("1");
+ Resource r1 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is 2 times the threshold, but not the most loaded.
+ SubClusterId sc2 = SubClusterId.newInstance("2");
+ Resource r2 =
+ Resource.newInstance(Integer.MAX_VALUE - 2 * pendingThreshold, 0);
+ // This cluster is at the threshold, but not the most loaded.
+ SubClusterId sc3 = SubClusterId.newInstance("3");
+ Resource r3 = Resource.newInstance(Integer.MAX_VALUE - pendingThreshold,
0);
+ // This cluster has zero pending.
+ SubClusterId sc4 = SubClusterId.newInstance("4");
+ Resource r4 = Resource.newInstance(Integer.MAX_VALUE - 0, 0);
+
+ Set<SubClusterId> scList = new HashSet<>();
+ scList.add(sc0);
+ scList.add(sc1);
+ scList.add(sc2);
+ scList.add(sc3);
+ scList.add(sc4);
+
+ policy.notifyOfResponse(sc0, getAllocateResponseWithTargetHeadroom(r0));
+ policy.notifyOfResponse(sc1, getAllocateResponseWithTargetHeadroom(r1));
+ policy.notifyOfResponse(sc2, getAllocateResponseWithTargetHeadroom(r2));
+ policy.notifyOfResponse(sc3, getAllocateResponseWithTargetHeadroom(r3));
+ policy.notifyOfResponse(sc4, getAllocateResponseWithTargetHeadroom(r4));
+
+ // sc2, sc3 and sc4 should just return the original subcluster.
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc2, pendingThreshold, scList).equals(sc2));
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc3, pendingThreshold, scList).equals(sc3));
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc4, pendingThreshold, scList).equals(sc4));
+
+ // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights
+ // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify
that
+ // the proportion approximately holds.
+ Map<SubClusterId, Integer> counts = new HashMap<SubClusterId, Integer>();
+ counts.put(sc0, 0);
+ counts.put(sc1, 0);
+ counts.put(sc2, 0);
+ counts.put(sc3, 0);
+ counts.put(sc4, 0);
+ int n = 100000;
+ for (int i = 0; i < n; i++) {
+ SubClusterId selectedId =
+ policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+
+ selectedId =
+ policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+
+ // Also try a new SCId that's not active and enabled. Should be rerouted
+ // to sc0-4 with the same distribution as above
+ selectedId = policy.routeNodeRequestIfNeeded(
+ SubClusterId.newInstance("10"), pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+ }
+
+ // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2
+ Assert.assertTrue(
+ approximateEquals((double) counts.get(sc0) / n / 3, 0.0625));
Review comment:
I think there is an assertEquals with an epsilon.
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
+ protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ // If targetId is not in the active and enabled SC list, reroute the
traffic
+ if (activeAndEnabledSCs.contains(targetId)) {
+ int targetPendingCount = getSubClusterLoad(targetId);
+ if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+ return targetId;
+ }
+ }
+ SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+ activeAndEnabledSCs);
+ return scId;
+ }
+
+ private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ ArrayList<Float> weights = new ArrayList<>();
+ ArrayList<SubClusterId> scIds = new ArrayList<>();
+ int targetLoad = getSubClusterLoad(targetId);
+ if (targetLoad == -1) {
+ // Probably a SC that's not active and enabled. Forcing a reroute
+ targetLoad = Integer.MAX_VALUE;
+ }
+
+ for (SubClusterId sc : activeAndEnabledSCs) {
+ int scLoad = getSubClusterLoad(sc);
+ if (scLoad > targetLoad) {
+ // Never mind if it is not the most loaded SC
+ return targetId;
+ }
+
+ /*
+ * Prepare the weights for a random draw among all known SCs.
+ *
+ * For SC with pending bigger than maxThreshold / 2, use maxThreshold /
+ * pending as weight. We multiplied by maxThreshold so that the weights
+ * won't be too small in value.
+ *
+ * For SC with pending less than maxThreshold / 2, we cap the weight at 2
+ * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending
+ * will not get a huge weight and thus get swamped.
+ */
+ if (scLoad <= maxThreshold / 2) {
+ weights.add(2f);
+ } else {
+ weights.add((float) maxThreshold / scLoad);
+ }
+ scIds.add(sc);
+ }
+ if (weights.size() == 0) {
+ return targetId;
+ }
+ return scIds.get(FederationPolicyUtils.getWeightedRandom(weights));
+ }
+
+ private int getSubClusterLoad(SubClusterId subClusterId) {
+ Resource r = this.headroom.get(subClusterId);
+ if (r == null) {
+ return -1;
+ }
+ return Integer.MAX_VALUE - r.getMemory();
Review comment:
What is this return?
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
##########
@@ -795,6 +795,100 @@ public void testSubClusterExpiry() throws Exception {
checkTotalContainerAllocation(response, 100);
}
+ @Test
+ public void testPickSubClusterIdForMaxLoadSC() throws YarnException {
+ int pendingThreshold = 1000;
+
+ LocalityMulticastAMRMProxyPolicy policy =
+ (LocalityMulticastAMRMProxyPolicy) getPolicy();
+ initializePolicy();
+
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc0 = SubClusterId.newInstance("0");
+ Resource r0 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc1 = SubClusterId.newInstance("1");
+ Resource r1 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is 2 times the threshold, but not the most loaded.
+ SubClusterId sc2 = SubClusterId.newInstance("2");
+ Resource r2 =
+ Resource.newInstance(Integer.MAX_VALUE - 2 * pendingThreshold, 0);
+ // This cluster is at the threshold, but not the most loaded.
+ SubClusterId sc3 = SubClusterId.newInstance("3");
+ Resource r3 = Resource.newInstance(Integer.MAX_VALUE - pendingThreshold,
0);
+ // This cluster has zero pending.
+ SubClusterId sc4 = SubClusterId.newInstance("4");
+ Resource r4 = Resource.newInstance(Integer.MAX_VALUE - 0, 0);
+
+ Set<SubClusterId> scList = new HashSet<>();
+ scList.add(sc0);
+ scList.add(sc1);
+ scList.add(sc2);
+ scList.add(sc3);
+ scList.add(sc4);
+
+ policy.notifyOfResponse(sc0, getAllocateResponseWithTargetHeadroom(r0));
+ policy.notifyOfResponse(sc1, getAllocateResponseWithTargetHeadroom(r1));
+ policy.notifyOfResponse(sc2, getAllocateResponseWithTargetHeadroom(r2));
+ policy.notifyOfResponse(sc3, getAllocateResponseWithTargetHeadroom(r3));
+ policy.notifyOfResponse(sc4, getAllocateResponseWithTargetHeadroom(r4));
+
+ // sc2, sc3 and sc4 should just return the original subcluster.
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc2, pendingThreshold, scList).equals(sc2));
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc3, pendingThreshold, scList).equals(sc3));
+ Assert.assertTrue(policy
+ .routeNodeRequestIfNeeded(sc4, pendingThreshold, scList).equals(sc4));
+
+ // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights
+ // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify
that
+ // the proportion approximately holds.
+ Map<SubClusterId, Integer> counts = new HashMap<SubClusterId, Integer>();
+ counts.put(sc0, 0);
+ counts.put(sc1, 0);
+ counts.put(sc2, 0);
+ counts.put(sc3, 0);
+ counts.put(sc4, 0);
+ int n = 100000;
+ for (int i = 0; i < n; i++) {
+ SubClusterId selectedId =
+ policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+
+ selectedId =
+ policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+
+ // Also try a new SCId that's not active and enabled. Should be rerouted
+ // to sc0-4 with the same distribution as above
+ selectedId = policy.routeNodeRequestIfNeeded(
+ SubClusterId.newInstance("10"), pendingThreshold, scList);
+ counts.put(selectedId, counts.get(selectedId) + 1);
+ }
+
+ // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2
+ Assert.assertTrue(
+ approximateEquals((double) counts.get(sc0) / n / 3, 0.0625));
+ Assert.assertTrue(
+ approximateEquals((double) counts.get(sc1) / n / 3, 0.0625));
+ Assert
+ .assertTrue(approximateEquals((double) counts.get(sc2) / n / 3,
0.125));
+ Assert
+ .assertTrue(approximateEquals((double) counts.get(sc3) / n / 3, 0.25));
+ Assert.assertTrue(approximateEquals((double) counts.get(sc4) / n / 3,
0.5));
+
+ // Everything should be routed to these five active and enabled SCs
+ Assert.assertEquals(5, counts.size());
+ }
+
+ protected boolean approximateEquals(double a, double b) {
+ LOG.info("Comparing " + a + " and " + b);
+ return Math.abs(a - b) < 0.01;
Review comment:
https://junit.org/junit4/javadoc/latest/org/junit/Assert.html#assertEquals(double,%20double,%20double)
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
+ protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ // If targetId is not in the active and enabled SC list, reroute the
traffic
+ if (activeAndEnabledSCs.contains(targetId)) {
+ int targetPendingCount = getSubClusterLoad(targetId);
+ if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+ return targetId;
+ }
+ }
+ SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+ activeAndEnabledSCs);
+ return scId;
+ }
+
+ private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ ArrayList<Float> weights = new ArrayList<>();
+ ArrayList<SubClusterId> scIds = new ArrayList<>();
+ int targetLoad = getSubClusterLoad(targetId);
+ if (targetLoad == -1) {
+ // Probably a SC that's not active and enabled. Forcing a reroute
+ targetLoad = Integer.MAX_VALUE;
+ }
+
+ for (SubClusterId sc : activeAndEnabledSCs) {
+ int scLoad = getSubClusterLoad(sc);
+ if (scLoad > targetLoad) {
+ // Never mind if it is not the most loaded SC
+ return targetId;
+ }
+
+ /*
Review comment:
We shouldn't use javadoc style inside, if we want a block we just do:
/*
Comments
*/
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -232,6 +238,19 @@ public void notifyOfResponse(SubClusterId subClusterId,
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
+
+ // If needed, re-reroute node requests base on SC load
+ // Read from config every time so that it is SCDable
Review comment:
BTW, wouldn't this be a little expensive for every time to check the
configuration?
Do we need to reload it?
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
Review comment:
Let's follow the javadoc recommendations from yetus.
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
##########
@@ -3917,6 +3917,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
60000; // one minute
+ // Pending container limit
+ public static final String LOAD_BASED_SC_SELECTOR_THRESHOLD =
+
"yarn.nodemanager.least-load-policy-selector.pending-container.threshold";
+ public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD = 10000;
+
+ public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED = false;
Review comment:
The default is usually after the definition.
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
##########
@@ -795,6 +795,100 @@ public void testSubClusterExpiry() throws Exception {
checkTotalContainerAllocation(response, 100);
}
+ @Test
+ public void testPickSubClusterIdForMaxLoadSC() throws YarnException {
+ int pendingThreshold = 1000;
+
+ LocalityMulticastAMRMProxyPolicy policy =
+ (LocalityMulticastAMRMProxyPolicy) getPolicy();
+ initializePolicy();
+
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc0 = SubClusterId.newInstance("0");
+ Resource r0 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is the most overloaded - 4 times the threshold.
+ SubClusterId sc1 = SubClusterId.newInstance("1");
+ Resource r1 =
+ Resource.newInstance(Integer.MAX_VALUE - 4 * pendingThreshold, 0);
+ // This cluster is 2 times the threshold, but not the most loaded.
+ SubClusterId sc2 = SubClusterId.newInstance("2");
+ Resource r2 =
+ Resource.newInstance(Integer.MAX_VALUE - 2 * pendingThreshold, 0);
+ // This cluster is at the threshold, but not the most loaded.
+ SubClusterId sc3 = SubClusterId.newInstance("3");
+ Resource r3 = Resource.newInstance(Integer.MAX_VALUE - pendingThreshold,
0);
+ // This cluster has zero pending.
+ SubClusterId sc4 = SubClusterId.newInstance("4");
+ Resource r4 = Resource.newInstance(Integer.MAX_VALUE - 0, 0);
+
+ Set<SubClusterId> scList = new HashSet<>();
+ scList.add(sc0);
+ scList.add(sc1);
+ scList.add(sc2);
+ scList.add(sc3);
+ scList.add(sc4);
+
+ policy.notifyOfResponse(sc0, getAllocateResponseWithTargetHeadroom(r0));
+ policy.notifyOfResponse(sc1, getAllocateResponseWithTargetHeadroom(r1));
+ policy.notifyOfResponse(sc2, getAllocateResponseWithTargetHeadroom(r2));
+ policy.notifyOfResponse(sc3, getAllocateResponseWithTargetHeadroom(r3));
+ policy.notifyOfResponse(sc4, getAllocateResponseWithTargetHeadroom(r4));
+
+ // sc2, sc3 and sc4 should just return the original subcluster.
+ Assert.assertTrue(policy
Review comment:
Can we use assertEquals?
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -483,6 +502,73 @@ private float getHeadroomWeighting(SubClusterId targetId,
return headroomWeighting;
}
+ /**
+ * When certain subcluster is too loaded, reroute Node requests going there.
+ */
+ protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ // If targetId is not in the active and enabled SC list, reroute the
traffic
+ if (activeAndEnabledSCs.contains(targetId)) {
+ int targetPendingCount = getSubClusterLoad(targetId);
+ if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+ return targetId;
+ }
+ }
+ SubClusterId scId = pickSubClusterIdForMaxLoadSC(targetId, maxThreshold,
+ activeAndEnabledSCs);
+ return scId;
+ }
+
+ private SubClusterId pickSubClusterIdForMaxLoadSC(SubClusterId targetId,
+ int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+ ArrayList<Float> weights = new ArrayList<>();
+ ArrayList<SubClusterId> scIds = new ArrayList<>();
+ int targetLoad = getSubClusterLoad(targetId);
+ if (targetLoad == -1) {
+ // Probably a SC that's not active and enabled. Forcing a reroute
+ targetLoad = Integer.MAX_VALUE;
+ }
+
+ for (SubClusterId sc : activeAndEnabledSCs) {
+ int scLoad = getSubClusterLoad(sc);
+ if (scLoad > targetLoad) {
+ // Never mind if it is not the most loaded SC
+ return targetId;
+ }
+
+ /*
+ * Prepare the weights for a random draw among all known SCs.
+ *
+ * For SC with pending bigger than maxThreshold / 2, use maxThreshold /
+ * pending as weight. We multiplied by maxThreshold so that the weights
+ * won't be too small in value.
+ *
+ * For SC with pending less than maxThreshold / 2, we cap the weight at 2
+ * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending
+ * will not get a huge weight and thus get swamped.
+ */
+ if (scLoad <= maxThreshold / 2) {
+ weights.add(2f);
+ } else {
+ weights.add((float) maxThreshold / scLoad);
+ }
+ scIds.add(sc);
+ }
+ if (weights.size() == 0) {
+ return targetId;
+ }
+ return scIds.get(FederationPolicyUtils.getWeightedRandom(weights));
+ }
+
+ private int getSubClusterLoad(SubClusterId subClusterId) {
+ Resource r = this.headroom.get(subClusterId);
+ if (r == null) {
+ return -1;
+ }
+ return Integer.MAX_VALUE - r.getMemory();
Review comment:
Note the Yetus comment about getMemory being deprecated too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]