goiri commented on code in PR #4632:
URL: https://github.com/apache/hadoop/pull/4632#discussion_r931635892
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java:
##########
@@ -50,53 +48,12 @@ public void reinitialize(
setPolicyContext(federationPolicyContext);
}
- /**
- * Simply picks from alphabetically-sorted active subclusters based on the
- * hash of quey name. Jobs of the same queue will all be routed to the same
- * sub-cluster, as far as the number of active sub-cluster and their names
- * remain the same.
- *
- * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
- * has to be routed to an appropriate subCluster for execution.
- *
- * @param blackListSubClusters the list of subClusters as identified by
- * {@link SubClusterId} to blackList from the selection of the home
- * subCluster.
- *
- * @return a hash-based chosen {@link SubClusterId} that will be the "home"
- * for this application.
- *
- * @throws YarnException if there are no active subclusters.
- */
@Override
- public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext,
- List<SubClusterId> blackListSubClusters) throws YarnException {
-
- // throws if no active subclusters available
- Map<SubClusterId, SubClusterInfo> activeSubclusters =
- getActiveSubclusters();
-
- FederationPolicyUtils.validateSubClusterAvailability(
- new ArrayList<SubClusterId>(activeSubclusters.keySet()),
- blackListSubClusters);
-
- if (blackListSubClusters != null) {
-
- // Remove from the active SubClusters from StateStore the blacklisted
ones
- for (SubClusterId scId : blackListSubClusters) {
- activeSubclusters.remove(scId);
- }
- }
-
- validate(appSubmissionContext);
-
- int chosenPosition = Math.abs(
- appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
-
- List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+ protected SubClusterId chooseSubCluster(String queue,
Review Comment:
Can we have a javadoc in the parent?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -37,34 +35,19 @@
public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
- public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext,
- List<SubClusterId> blacklist) throws YarnException {
-
- // null checks and default-queue behavior
- validate(appSubmissionContext);
-
- Map<SubClusterId, SubClusterInfo> activeSubclusters =
- getActiveSubclusters();
-
- FederationPolicyUtils.validateSubClusterAvailability(
- new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
+ protected SubClusterId chooseSubCluster(
+ String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters)
throws YarnException {
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
- // sub-clusters adjacent to an inactive one), hence we need to count/scan
+ // sub-clusters adja cent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
ArrayList<Float> weightList = new ArrayList<>();
ArrayList<SubClusterId> scIdList = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
- if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
- continue;
- }
- if (entry.getKey() != null
- && activeSubclusters.containsKey(entry.getKey().toId())) {
+ if (entry.getKey() != null &&
preSelectSubClusters.containsKey(entry.getKey().toId())) {
Review Comment:
getValue() possibly too.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java:
##########
@@ -64,29 +60,24 @@ public void
reinitialize(FederationPolicyInitializationContext policyContext)
}
}
- @Override
- public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext,
- List<SubClusterId> blacklist) throws YarnException {
-
- // null checks and default-queue behavior
- validate(appSubmissionContext);
-
- Map<SubClusterId, SubClusterInfo> activeSubclusters =
- getActiveSubclusters();
-
- FederationPolicyUtils.validateSubClusterAvailability(
- new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+ private long getAvailableMemory(SubClusterInfo value) throws YarnException {
+ try {
+ long mem = -1;
+ JSONObject obj = new JSONObject(value.getCapability());
+ mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
Review Comment:
Directly:
```
long mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
```
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java:
##########
@@ -63,4 +72,70 @@ public void validate(ApplicationSubmissionContext
appSubmissionContext)
}
}
+ protected abstract SubClusterId chooseSubCluster(String queue,
+ Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws
YarnException;
+
+ protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
+ ReservationId reservationId, Map<SubClusterId, SubClusterInfo>
activeSubClusters)
+ throws YarnException {
+
+ // if a reservation exists limit scope to the sub-cluster this
+ // reservation is mapped to
+ if (reservationId != null) {
+
+ // note this might throw YarnException if the reservation is
+ // unknown. This is to be expected, and should be handled by
+ // policy invoker.
+ SubClusterId resSubCluster =
+ getPolicyContext().getFederationStateStoreFacade().
+ getReservationHomeSubCluster(reservationId);
+
+ return Collections.singletonMap(resSubCluster,
activeSubClusters.get(resSubCluster));
+ }
+
+ return activeSubClusters;
+ }
+
+ @Override
+ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext
appContext,
+ List<SubClusterId> blackLists) throws YarnException {
+
+ // null checks and default-queue behavior
+ validate(appContext);
+
+ // apply filtering based on reservation location and active sub-clusters
+ Map<SubClusterId, SubClusterInfo> filteredSubClusters =
prefilterSubClusters(
+ appContext.getReservationID(), getActiveSubclusters());
+
+ FederationPolicyUtils.validateSubClusterAvailability(
Review Comment:
Can't we have a method that takes both arguments separate?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -37,34 +35,19 @@
public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
- public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext,
- List<SubClusterId> blacklist) throws YarnException {
-
- // null checks and default-queue behavior
- validate(appSubmissionContext);
-
- Map<SubClusterId, SubClusterInfo> activeSubclusters =
- getActiveSubclusters();
-
- FederationPolicyUtils.validateSubClusterAvailability(
- new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
+ protected SubClusterId chooseSubCluster(
+ String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters)
throws YarnException {
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
- // sub-clusters adjacent to an inactive one), hence we need to count/scan
+ // sub-clusters adja cent to an inactive one), hence we need to count/scan
Review Comment:
typo
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java:
##########
@@ -128,53 +125,55 @@ public SubClusterId getHomeSubcluster(
ResourceRequest nodeRequest = null;
ResourceRequest rackRequest = null;
ResourceRequest anyRequest = null;
+
for (ResourceRequest rr : rrList) {
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
nodeRequest = rr;
} catch (YarnException e) {
- LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
+ LOG.error("Cannot resolve node.", e);
}
// Handle "rack" requests
try {
resolver.getSubClustersForRack(rr.getResourceName());
rackRequest = rr;
} catch (YarnException e) {
- LOG.error("Cannot resolve rack : {}", e.getLocalizedMessage());
+ LOG.error("Cannot resolve rack.", e);
}
// Handle "ANY" requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
anyRequest = rr;
continue;
}
}
+
if (nodeRequest == null) {
- throw new YarnException("Missing node request");
+ throw new YarnException("Missing node request.");
}
if (rackRequest == null) {
- throw new YarnException("Missing rack request");
+ throw new YarnException("Missing rack request.");
}
if (anyRequest == null) {
- throw new YarnException("Missing any request");
+ throw new YarnException("Missing any request.");
}
- LOG.info(
- "Node request: " + nodeRequest.getResourceName() + ", Rack request: "
- + rackRequest.getResourceName() + ", Any request: " + anyRequest
- .getResourceName());
+
+ LOG.info("Node request: {} , Rack request: {} , Any request: {}.",
Review Comment:
Spaces are weird.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -37,34 +35,19 @@
public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
- public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext,
- List<SubClusterId> blacklist) throws YarnException {
-
- // null checks and default-queue behavior
- validate(appSubmissionContext);
-
- Map<SubClusterId, SubClusterInfo> activeSubclusters =
- getActiveSubclusters();
-
- FederationPolicyUtils.validateSubClusterAvailability(
- new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
-
+ protected SubClusterId chooseSubCluster(
+ String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters)
throws YarnException {
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
- // sub-clusters adjacent to an inactive one), hence we need to count/scan
+ // sub-clusters adja cent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
ArrayList<Float> weightList = new ArrayList<>();
ArrayList<SubClusterId> scIdList = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
- if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
- continue;
- }
- if (entry.getKey() != null
- && activeSubclusters.containsKey(entry.getKey().toId())) {
+ if (entry.getKey() != null &&
preSelectSubClusters.containsKey(entry.getKey().toId())) {
Review Comment:
Extract entry.getKey()
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java:
##########
@@ -128,53 +125,55 @@ public SubClusterId getHomeSubcluster(
ResourceRequest nodeRequest = null;
ResourceRequest rackRequest = null;
ResourceRequest anyRequest = null;
+
for (ResourceRequest rr : rrList) {
// Handle "node" requests
try {
targetId = resolver.getSubClusterForNode(rr.getResourceName());
nodeRequest = rr;
} catch (YarnException e) {
- LOG.error("Cannot resolve node : {}", e.getLocalizedMessage());
+ LOG.error("Cannot resolve node.", e);
Review Comment:
Can we avoid the full exception?
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java:
##########
@@ -63,4 +72,70 @@ public void validate(ApplicationSubmissionContext
appSubmissionContext)
}
}
+ protected abstract SubClusterId chooseSubCluster(String queue,
+ Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws
YarnException;
+
+ protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
+ ReservationId reservationId, Map<SubClusterId, SubClusterInfo>
activeSubClusters)
+ throws YarnException {
+
+ // if a reservation exists limit scope to the sub-cluster this
+ // reservation is mapped to
+ if (reservationId != null) {
+
+ // note this might throw YarnException if the reservation is
+ // unknown. This is to be expected, and should be handled by
+ // policy invoker.
+ SubClusterId resSubCluster =
+ getPolicyContext().getFederationStateStoreFacade().
+ getReservationHomeSubCluster(reservationId);
+
+ return Collections.singletonMap(resSubCluster,
activeSubClusters.get(resSubCluster));
+ }
+
+ return activeSubClusters;
+ }
+
+ @Override
+ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext
appContext,
+ List<SubClusterId> blackLists) throws YarnException {
+
+ // null checks and default-queue behavior
+ validate(appContext);
+
+ // apply filtering based on reservation location and active sub-clusters
+ Map<SubClusterId, SubClusterInfo> filteredSubClusters =
prefilterSubClusters(
+ appContext.getReservationID(), getActiveSubclusters());
+
+ FederationPolicyUtils.validateSubClusterAvailability(
+ new ArrayList<>(filteredSubClusters.keySet()), blackLists);
+
+ // remove black SubCluster
+ if (blackLists != null) {
+ blackLists.forEach(filteredSubClusters::remove);
+ }
+
+ // pick the chosen subCluster from the active ones
+ return chooseSubCluster(appContext.getQueue(), filteredSubClusters);
+ }
+
+
+ @Override
+ public SubClusterId
getReservationHomeSubcluster(ReservationSubmissionRequest request)
+ throws YarnException {
+ if (request == null) {
+ throw new FederationPolicyException(
Review Comment:
One line.
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java:
##########
@@ -73,9 +56,8 @@ public SubClusterId getHomeSubcluster(
int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList);
if (pickedIndex == -1) {
throw new FederationPolicyException(
- "No positive weight found on active subclusters");
+ "No positive weight found on active subClusters.");
Review Comment:
The previous capitalization looked more like it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]