goiri commented on a change in pull request #3760:
URL: https://github.com/apache/hadoop/pull/3760#discussion_r796864909



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending
+ * asks among all sub-clusters.
+ */
+public class ContainerAsksBalancer implements Configurable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAsksBalancer.class);
+
+  public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      YarnConfiguration.FEDERATION_PREFIX
+          + "downgrade-to-other-subcluster-interval-ms";
+  private static final long DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      10000;
+
+  private Configuration conf;
+
+  // Holds the pending requests and allocation history of all sub-clusters
+  private Map<SubClusterId, AMRMClientRelayer> clientRelayers;
+
+  private long subclusterAskTimeOut;
+  private Map<SubClusterId, Map<ExecutionType, Long>> lastRelaxCandidateCount;
+
+  public ContainerAsksBalancer() {
+    this.clientRelayers = new ConcurrentHashMap<>();
+    this.lastRelaxCandidateCount = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    this.conf = config;
+    this.subclusterAskTimeOut = this.conf
+        .getLong(DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL,
+            DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public void shutdown() {
+    this.lastRelaxCandidateCount.clear();
+    this.clientRelayers.clear();
+  }
+
+  /**
+   * Pass in the AMRMClientRelayer for a new sub-cluster.
+   *
+   * @param subClusterId sub-cluster id
+   * @param relayer      the AMRMClientRelayer for this sub-cluster
+   * @throws YarnException if fails
+   */
+  public void addAMRMClientRelayer(SubClusterId subClusterId,
+      AMRMClientRelayer relayer) throws YarnException {
+    if (this.clientRelayers.containsKey(subClusterId)) {
+      LOG.warn("AMRMClientRelayer already exists for " + subClusterId);
+    }
+    this.clientRelayers.put(subClusterId, relayer);
+
+    Map<ExecutionType, Long> map = new ConcurrentHashMap<>();
+    for (ExecutionType type : ExecutionType.values()) {
+      map.put(type, 0L);
+    }
+    this.lastRelaxCandidateCount.put(subClusterId, map);
+  }
+
+  /**
+   * Modify the output from split-merge (AMRMProxyPolicy). Adding and removing
+   * asks to balance the pending asks in all sub-clusters.
+   */
+  public void adjustAsks() {
+    Map<ResourceRequestSetKey, ResourceRequestSet> pendingAsks =
+        new HashMap<>();
+    Map<ResourceRequestSetKey, Long> pendingTime = new HashMap<>();
+    for (Entry<SubClusterId, AMRMClientRelayer> relayerEntry : 
this.clientRelayers
+        .entrySet()) {
+      SubClusterId scId = relayerEntry.getKey();
+
+      pendingAsks.clear();
+      pendingTime.clear();
+      relayerEntry.getValue()
+          .gatherReadOnlyPendingAsksInfo(pendingAsks, pendingTime);
+
+      Map<ExecutionType, Long> currentCandidateCount = new HashMap<>();
+      for (Entry<ResourceRequestSetKey, Long> pendingTimeEntry : pendingTime
+          .entrySet()) {
+        if (pendingTimeEntry.getValue() < this.subclusterAskTimeOut) {
+          continue;
+        }
+        ResourceRequestSetKey askKey = pendingTimeEntry.getKey();
+        ResourceRequestSet askSet = pendingAsks.get(askKey);
+        if (!askSet.isANYRelaxable()) {
+          continue;
+        }
+        long value = askSet.getNumContainers();
+        if (currentCandidateCount.containsKey(askKey.getExeType())) {
+          value += currentCandidateCount.get(askKey.getExeType());
+        }
+        currentCandidateCount.put(askKey.getExeType(), value);
+      }
+
+      // Update the pending metrics for the sub-cluster
+      updateRelaxCandidateMetrics(scId, currentCandidateCount);
+    }
+  }
+
+  protected void updateRelaxCandidateMetrics(SubClusterId scId,
+      Map<ExecutionType, Long> currentCandidateCount) {
+    Map<ExecutionType, Long> lastValueMap =
+        this.lastRelaxCandidateCount.get(scId);
+    for (ExecutionType type : ExecutionType.values()) {
+      long newValue = 0;
+      if (currentCandidateCount.containsKey(type)) {
+        newValue = currentCandidateCount.get(type);
+      }
+      long lastValue = lastValueMap.get(type);
+      lastValueMap.put(type, newValue);
+      LOG.debug("updating SCRelaxable " + type + " asks in " + scId + " from "

Review comment:
       Logger {}

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
##########
@@ -130,4 +137,43 @@ public String toString() {
         + (this.execType.equals(ExecutionType.GUARANTEED) ? " G"
             : " O" + " r:" + this.resource + "]");
   }
+
+  /**
+   * Extract the corresponding ResourceRequestSetKey for an allocated container
+   * from a given set. Return null if not found.
+   *
+   * @param container the allocated container
+   * @param keys the set of keys to look from
+   * @return ResourceRequestSetKey
+   */
+  public static ResourceRequestSetKey extractMatchingKey(Container container,
+      Set<ResourceRequestSetKey> keys) {
+    ResourceRequestSetKey key = new ResourceRequestSetKey(
+        container.getAllocationRequestId(), container.getPriority(),
+        container.getResource(), container.getExecutionType());
+    if (keys.contains(key)) {

Review comment:
       Does this implement hashCode and all?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import java.util.AbstractMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Records the allocation history from YarnRM and provide aggregated insights.
+ */
+public class ContainerAllocationHistory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientRelayer.class);
+
+  private int maxEntryCount;
+
+  // Allocate timing history <AllocateTimeStamp, AllocateLatency>
+  private Queue<Entry<Long, Long>> relaxableG = new LinkedList<>();
+
+  public ContainerAllocationHistory(Configuration conf) {
+    this.maxEntryCount =

Review comment:
       '''
   this.maxEntryCount = conf.getInt(
       YarnConfiguration.FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY,
       YarnConfiguration.DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY);
   '''

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending
+ * asks among all sub-clusters.
+ */
+public class ContainerAsksBalancer implements Configurable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAsksBalancer.class);
+
+  public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      YarnConfiguration.FEDERATION_PREFIX
+          + "downgrade-to-other-subcluster-interval-ms";
+  private static final long DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      10000;
+
+  private Configuration conf;
+
+  // Holds the pending requests and allocation history of all sub-clusters
+  private Map<SubClusterId, AMRMClientRelayer> clientRelayers;
+
+  private long subclusterAskTimeOut;
+  private Map<SubClusterId, Map<ExecutionType, Long>> lastRelaxCandidateCount;
+
+  public ContainerAsksBalancer() {
+    this.clientRelayers = new ConcurrentHashMap<>();
+    this.lastRelaxCandidateCount = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    this.conf = config;
+    this.subclusterAskTimeOut = this.conf
+        .getLong(DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL,
+            DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public void shutdown() {
+    this.lastRelaxCandidateCount.clear();
+    this.clientRelayers.clear();
+  }
+
+  /**
+   * Pass in the AMRMClientRelayer for a new sub-cluster.
+   *
+   * @param subClusterId sub-cluster id
+   * @param relayer      the AMRMClientRelayer for this sub-cluster
+   * @throws YarnException if fails
+   */
+  public void addAMRMClientRelayer(SubClusterId subClusterId,
+      AMRMClientRelayer relayer) throws YarnException {
+    if (this.clientRelayers.containsKey(subClusterId)) {
+      LOG.warn("AMRMClientRelayer already exists for " + subClusterId);
+    }
+    this.clientRelayers.put(subClusterId, relayer);
+
+    Map<ExecutionType, Long> map = new ConcurrentHashMap<>();
+    for (ExecutionType type : ExecutionType.values()) {
+      map.put(type, 0L);
+    }
+    this.lastRelaxCandidateCount.put(subClusterId, map);
+  }
+
+  /**
+   * Modify the output from split-merge (AMRMProxyPolicy). Adding and removing
+   * asks to balance the pending asks in all sub-clusters.
+   */
+  public void adjustAsks() {
+    Map<ResourceRequestSetKey, ResourceRequestSet> pendingAsks =
+        new HashMap<>();
+    Map<ResourceRequestSetKey, Long> pendingTime = new HashMap<>();
+    for (Entry<SubClusterId, AMRMClientRelayer> relayerEntry : 
this.clientRelayers
+        .entrySet()) {
+      SubClusterId scId = relayerEntry.getKey();
+
+      pendingAsks.clear();
+      pendingTime.clear();
+      relayerEntry.getValue()

Review comment:
       We should extract getValue()

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
##########
@@ -795,6 +797,106 @@ public void testSubClusterExpiry() throws Exception {
     checkTotalContainerAllocation(response, 100);
   }
 
+  @After
+  public void cleanup() {
+    ((FederationAMRMProxyPolicy) getPolicy()).shutdown();
+  }
+
+  @Test
+  public void testLoadbasedSubClusterReroute() throws YarnException {
+    int pendingThreshold = 1000;
+
+    LocalityMulticastAMRMProxyPolicy policy =
+        (LocalityMulticastAMRMProxyPolicy) getPolicy();
+    initializePolicy();
+
+    SubClusterId sc0 = SubClusterId.newInstance("0");
+    SubClusterId sc1 = SubClusterId.newInstance("1");
+    SubClusterId sc2 = SubClusterId.newInstance("2");
+    SubClusterId sc3 = SubClusterId.newInstance("3");
+    SubClusterId sc4 = SubClusterId.newInstance("4");
+
+    Set<SubClusterId> scList = new HashSet<>();
+    scList.add(sc0);
+    scList.add(sc1);
+    scList.add(sc2);
+    scList.add(sc3);
+    scList.add(sc4);
+
+    // This cluster is the most overloaded - 4 times the threshold.
+    policy.notifyOfResponse(sc0,
+        getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
+
+    // This cluster is the most overloaded - 4 times the threshold.
+    policy.notifyOfResponse(sc1,
+        getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
+
+    // This cluster is 2 times the threshold, but not the most loaded.
+    policy.notifyOfResponse(sc2,
+        getAllocateResponseWithEnhancedHeadroom(2 * pendingThreshold, 0));
+
+    // This cluster is at the threshold, but not the most loaded.
+    policy.notifyOfResponse(sc3,
+        getAllocateResponseWithEnhancedHeadroom(pendingThreshold, 0));
+
+    // This cluster has zero pending.
+    policy.notifyOfResponse(sc4, getAllocateResponseWithEnhancedHeadroom(0, 
0));
+
+    // sc2, sc3 and sc4 should just return the original subcluster.
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc2, pendingThreshold, scList), sc2);
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc3, pendingThreshold, scList), sc3);
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc4, pendingThreshold, scList), sc4);
+
+    // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights
+    // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify 
that
+    // the proportion approximately holds.
+    Map<SubClusterId, Integer> counts = new HashMap<SubClusterId, Integer>();
+    counts.put(sc0, 0);
+    counts.put(sc1, 0);
+    counts.put(sc2, 0);
+    counts.put(sc3, 0);
+    counts.put(sc4, 0);
+    int n = 100000;
+    for (int i = 0; i < n; i++) {
+      SubClusterId selectedId =
+          policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+
+      selectedId =
+          policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+
+      // Also try a new SCId that's not active and enabled. Should be rerouted
+      // to sc0-4 with the same distribution as above
+      selectedId = policy
+          .routeNodeRequestIfNeeded(SubClusterId.newInstance("10"),
+              pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+    }
+
+    // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2
+    Assert.assertEquals((double) counts.get(sc0) / n / 3, 0.0625, 0.01);
+    Assert.assertEquals((double) counts.get(sc1) / n / 3, 0.0625, 0.01);
+    Assert.assertEquals((double) counts.get(sc2) / n / 3, 0.125, 0.01);
+    Assert.assertEquals((double) counts.get(sc3) / n / 3, 0.25, 0.01);
+    Assert.assertEquals((double) counts.get(sc4) / n / 3, 0.5, 0.01);

Review comment:
       Can we do 1/2.0 instead of 0.5 etc?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending
+ * asks among all sub-clusters.
+ */
+public class ContainerAsksBalancer implements Configurable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAsksBalancer.class);
+
+  public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      YarnConfiguration.FEDERATION_PREFIX

Review comment:
       Let's use getTimeDuration()

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
##########
@@ -130,4 +137,43 @@ public String toString() {
         + (this.execType.equals(ExecutionType.GUARANTEED) ? " G"
             : " O" + " r:" + this.resource + "]");
   }
+
+  /**
+   * Extract the corresponding ResourceRequestSetKey for an allocated container
+   * from a given set. Return null if not found.
+   *
+   * @param container the allocated container
+   * @param keys the set of keys to look from
+   * @return ResourceRequestSetKey
+   */
+  public static ResourceRequestSetKey extractMatchingKey(Container container,
+      Set<ResourceRequestSetKey> keys) {
+    ResourceRequestSetKey key = new ResourceRequestSetKey(
+        container.getAllocationRequestId(), container.getPriority(),
+        container.getResource(), container.getExecutionType());
+    if (keys.contains(key)) {
+      return key;
+    }
+    if (container.getAllocationRequestId() > 0) {
+      // If no exact match, look for the one with the same (non-zero)
+      // allocationRequestId
+      for (ResourceRequestSetKey candidate : keys) {
+        if (candidate.getAllocationRequestId() == container
+            .getAllocationRequestId()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Using possible match for " + key + ": " + candidate);
+          }
+          return candidate;
+        }
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("not match found for container " + container);
+      for (ResourceRequestSetKey candidate : keys) {
+        LOG.debug("candidate set keys: " + candidate);

Review comment:
       Logger {}

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import java.util.AbstractMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Records the allocation history from YarnRM and provide aggregated insights.
+ */
+public class ContainerAllocationHistory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientRelayer.class);
+
+  private int maxEntryCount;
+
+  // Allocate timing history <AllocateTimeStamp, AllocateLatency>
+  private Queue<Entry<Long, Long>> relaxableG = new LinkedList<>();
+
+  public ContainerAllocationHistory(Configuration conf) {
+    this.maxEntryCount =
+        conf.getInt(YarnConfiguration.FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY,
+            YarnConfiguration.DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY);
+  }
+
+  public synchronized void addAllocationEntry(Container container,
+      ResourceRequestSet requestSet, long fulfillTimeStamp,
+      long fulfillLatency) {
+    if (!requestSet.isANYRelaxable()) {
+      LOG.info("allocation history ignoring {}, relax locality is false",

Review comment:
       Capitalize?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import java.util.AbstractMap;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Records the allocation history from YarnRM and provide aggregated insights.
+ */
+public class ContainerAllocationHistory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AMRMClientRelayer.class);
+
+  private int maxEntryCount;
+
+  // Allocate timing history <AllocateTimeStamp, AllocateLatency>
+  private Queue<Entry<Long, Long>> relaxableG = new LinkedList<>();
+
+  public ContainerAllocationHistory(Configuration conf) {
+    this.maxEntryCount =
+        conf.getInt(YarnConfiguration.FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY,
+            YarnConfiguration.DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY);
+  }
+
+  public synchronized void addAllocationEntry(Container container,

Review comment:
       Javadoc?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending
+ * asks among all sub-clusters.
+ */
+public class ContainerAsksBalancer implements Configurable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAsksBalancer.class);
+
+  public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      YarnConfiguration.FEDERATION_PREFIX
+          + "downgrade-to-other-subcluster-interval-ms";
+  private static final long DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      10000;
+
+  private Configuration conf;
+
+  // Holds the pending requests and allocation history of all sub-clusters
+  private Map<SubClusterId, AMRMClientRelayer> clientRelayers;
+
+  private long subclusterAskTimeOut;
+  private Map<SubClusterId, Map<ExecutionType, Long>> lastRelaxCandidateCount;
+
+  public ContainerAsksBalancer() {
+    this.clientRelayers = new ConcurrentHashMap<>();
+    this.lastRelaxCandidateCount = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    this.conf = config;
+    this.subclusterAskTimeOut = this.conf
+        .getLong(DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL,
+            DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public void shutdown() {
+    this.lastRelaxCandidateCount.clear();
+    this.clientRelayers.clear();
+  }
+
+  /**
+   * Pass in the AMRMClientRelayer for a new sub-cluster.
+   *
+   * @param subClusterId sub-cluster id
+   * @param relayer      the AMRMClientRelayer for this sub-cluster
+   * @throws YarnException if fails
+   */
+  public void addAMRMClientRelayer(SubClusterId subClusterId,
+      AMRMClientRelayer relayer) throws YarnException {
+    if (this.clientRelayers.containsKey(subClusterId)) {
+      LOG.warn("AMRMClientRelayer already exists for " + subClusterId);

Review comment:
       Use logger {}?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/ContainerAsksBalancer.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used by {@link LocalityMulticastAMRMProxyPolicy} to re-balance the pending
+ * asks among all sub-clusters.
+ */
+public class ContainerAsksBalancer implements Configurable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAsksBalancer.class);
+
+  public static final String DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      YarnConfiguration.FEDERATION_PREFIX
+          + "downgrade-to-other-subcluster-interval-ms";
+  private static final long DEFAULT_DOWNGRADE_TO_OTHER_SUBCLUSTER_INTERVAL =
+      10000;
+
+  private Configuration conf;
+
+  // Holds the pending requests and allocation history of all sub-clusters
+  private Map<SubClusterId, AMRMClientRelayer> clientRelayers;
+
+  private long subclusterAskTimeOut;
+  private Map<SubClusterId, Map<ExecutionType, Long>> lastRelaxCandidateCount;
+
+  public ContainerAsksBalancer() {
+    this.clientRelayers = new ConcurrentHashMap<>();
+    this.lastRelaxCandidateCount = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    this.conf = config;
+    this.subclusterAskTimeOut = this.conf

Review comment:
       Split the lines with the getLong() first.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -545,14 +732,13 @@ private void reinitialize(
       }
 
       if (activeAndEnabledSC.size() < 1) {
-        throw new NoActiveSubclustersException(
-            "None of the subclusters enabled in this policy (weight>0) are "
-                + "currently active we cannot forward the ResourceRequest(s)");
+          throw new NoActiveSubclustersException(
+              "None of the subclusters enabled in this policy (weight>0) are "
+                  + "currently active we cannot forward the 
ResourceRequest(s)");
       }
 
       Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
       tmpSCSet.removeAll(timedOutSubClusters);
-

Review comment:
       Avoid

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
##########
@@ -240,6 +304,18 @@ public void notifyOfResponse(SubClusterId subClusterId,
       // Handle "node" requests
       try {
         targetId = resolver.getSubClusterForNode(rr.getResourceName());
+
+        // If needed, re-reroute node requests base on SC load
+        if (conf.getBoolean(LOAD_BASED_SC_SELECTOR_ENABLED,
+            DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED)) {
+          int maxPendingThreshold =

Review comment:
       Move getInt to the previous line.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
##########
@@ -576,6 +583,40 @@ private void addResourceRequestToAsk(ResourceRequest 
remoteRequest) {
     this.ask.add(remoteRequest);
   }
 
+  public ContainerAllocationHistory getAllocationHistory() {
+    return this.allocationHistory;
+  }
+
+  private void addAllocationHistoryEntry(Container container,
+      long fulfillTimeStamp, long fulfillLatency) {
+    ResourceRequestSetKey key = ResourceRequestSetKey
+        .extractMatchingKey(container, this.remotePendingAsks.keySet());
+    if (key == null) {
+      LOG.info("allocation history ignoring {}, no matching request key found",
+          container);
+      return;
+    }
+    this.allocationHistory
+        .addAllocationEntry(container, this.remotePendingAsks.get(key),
+            fulfillTimeStamp, fulfillLatency);
+  }
+
+  public void gatherReadOnlyPendingAsksInfo(
+      Map<ResourceRequestSetKey, ResourceRequestSet> pendingAsks,
+      Map<ResourceRequestSetKey, Long> pendingTime) {
+    pendingAsks.clear();
+    pendingTime.clear();
+    synchronized (this) {
+      pendingAsks.putAll(this.remotePendingAsks);
+      for (ResourceRequestSetKey key : pendingAsks.keySet()) {
+        Long startTime = this.askTimeStamp.get(key.getAllocationRequestId());
+        if (startTime != null) {
+          pendingTime.put(key, System.currentTimeMillis() - startTime);

Review comment:
       Extract to elapsedMilliseconds or something like that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to