YARN-2888. Corrective mechanisms for rebalancing NM container queues. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0ac18d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0ac18d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0ac18d0

Branch: refs/heads/trunk
Commit: f0ac18d001d97914a9ee810b1fab56c5cebff830
Parents: 1f2794b
Author: Arun Suresh <asur...@apache.org>
Authored: Fri May 13 13:38:36 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Fri May 13 13:38:36 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  48 +++-
 .../yarn/conf/TestYarnConfigurationFields.java  |  14 +-
 .../protocolrecords/NodeHeartbeatResponse.java  |   4 +
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  42 +++
 .../api/records/ContainerQueuingLimit.java      |  44 +++
 .../impl/pb/ContainerQueuingLimitPBImpl.java    |  80 ++++++
 .../yarn_server_common_service_protos.proto     |   6 +
 .../hadoop/yarn/server/nodemanager/Context.java |   4 +-
 .../yarn/server/nodemanager/NodeManager.java    |   8 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      | 171 ++++++------
 .../containermanager/ContainerManager.java      |  42 +++
 .../containermanager/ContainerManagerImpl.java  |  12 +-
 .../queuing/QueuingContainerManagerImpl.java    |  38 +++
 .../amrmproxy/BaseAMRMProxyTest.java            |   3 +-
 .../server/resourcemanager/ClusterMonitor.java  |   6 +-
 .../DistributedSchedulingService.java           | 194 +++++++------
 .../yarn/server/resourcemanager/RMContext.java  |   4 +
 .../server/resourcemanager/RMContextImpl.java   |  14 +
 .../server/resourcemanager/ResourceManager.java |   2 +
 .../resourcemanager/ResourceTrackerService.java |   7 +
 .../distributed/NodeQueueLoadMonitor.java       | 271 +++++++++++++++++++
 .../distributed/QueueLimitCalculator.java       | 125 +++++++++
 .../scheduler/distributed/TopKNodeSelector.java | 223 ---------------
 .../distributed/TestNodeQueueLoadMonitor.java   | 195 +++++++++++++
 .../distributed/TestTopKNodeSelector.java       | 147 ----------
 25 files changed, 1156 insertions(+), 548 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e9c8ea9..0b150c2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -344,17 +344,47 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.top-k";
   public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
 
-  /** Frequency for computing Top K Best Nodes */
-  public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
-      YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
-  public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
-
-  /** Comparator for determining Node Load for Distributed Scheduling */
-  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
-      YARN_PREFIX + "distributed-scheduling.top-k-comparator";
-  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+  /** Frequency for computing least loaded NMs. */
+  public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
+      YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
+  public static final long
+      NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
+
+  /** Comparator for determining Node Load for Distributed Scheduling. */
+  public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
+      YARN_PREFIX + "nm-container-queuing.load-comparator";
+  public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
       "QUEUE_LENGTH";
 
+  /** Value of standard deviation used for calculation of queue limit
+   * thresholds. */
+  public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV =
+      YARN_PREFIX + "nm-container-queuing.queue-limit-stdev";
+  public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
+      1.0f;
+
+  /** Min length of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
+      YARN_PREFIX + "nm-container-queuing.min-queue-length";
+  public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1;
+
+  /** Max length of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
+      YARN_PREFIX + "nm-container-queuing.max-queue-length";
+  public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
+
+  /** Min wait time of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
+      YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
+  public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
+      1;
+
+  /** Max wait time of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
+      YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
+  public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
+      10;
+
   /**
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * only is used by the FileSystemRMStateStore to setup right file-system

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c92a276..61b698d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -135,9 +135,19 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
     configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
     configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
 
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index f8a1320..bd04a0d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
@@ -78,4 +79,7 @@ public interface NodeHeartbeatResponse {
 
   List<Container> getContainersToDecrease();
   void addAllContainersToDecrease(Collection<Container> containersToDecrease);
+
+  ContainerQueuingLimit getContainerQueuingLimit();
+  void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 224e50b..a259158 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
 import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
@@ -46,8 +47,10 @@ import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import 
org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
@@ -65,6 +68,7 @@ public class NodeHeartbeatResponsePBImpl extends
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
+  private ContainerQueuingLimit containerQueuingLimit = null;
   private List<Container> containersToDecrease = null;
   private List<SignalContainerRequest> containersToSignal = null;
 
@@ -102,6 +106,10 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.setNmTokenMasterKey(
           convertToProtoFormat(this.nmTokenMasterKey));
     }
+    if (this.containerQueuingLimit != null) {
+      builder.setContainerQueuingLimit(
+          convertToProtoFormat(this.containerQueuingLimit));
+    }
     if (this.systemCredentials != null) {
       addSystemCredentialsToProto();
     }
@@ -197,6 +205,30 @@ public class NodeHeartbeatResponsePBImpl extends
   }
 
   @Override
+  public ContainerQueuingLimit getContainerQueuingLimit() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.containerQueuingLimit != null) {
+      return this.containerQueuingLimit;
+    }
+    if (!p.hasContainerQueuingLimit()) {
+      return null;
+    }
+    this.containerQueuingLimit =
+        convertFromProtoFormat(p.getContainerQueuingLimit());
+    return this.containerQueuingLimit;
+  }
+
+  @Override
+  public void setContainerQueuingLimit(ContainerQueuingLimit
+      containerQueuingLimit) {
+    maybeInitBuilder();
+    if (containerQueuingLimit == null) {
+      builder.clearContainerQueuingLimit();
+    }
+    this.containerQueuingLimit = containerQueuingLimit;
+  }
+
+  @Override
   public NodeAction getNodeAction() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasNodeAction()) {
@@ -638,6 +670,16 @@ public class NodeHeartbeatResponsePBImpl extends
     builder.addAllContainersToSignal(iterable);
   }
 
+  private ContainerQueuingLimit convertFromProtoFormat(
+      ContainerQueuingLimitProto p) {
+    return new ContainerQueuingLimitPBImpl(p);
+  }
+
+  private ContainerQueuingLimitProto convertToProtoFormat(
+      ContainerQueuingLimit c) {
+    return ((ContainerQueuingLimitPBImpl)c).getProto();
+  }
+
   private SignalContainerRequestPBImpl convertFromProtoFormat(
       SignalContainerRequestProto p) {
     return new SignalContainerRequestPBImpl(p);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java
new file mode 100644
index 0000000..a8ae927
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Used to hold max wait time / queue length information to be
+ * passed back to the NodeManager.
+ */
+public abstract class ContainerQueuingLimit {
+
+  public static ContainerQueuingLimit newInstance() {
+    ContainerQueuingLimit containerQueuingLimit =
+        Records.newRecord(ContainerQueuingLimit.class);
+    containerQueuingLimit.setMaxQueueLength(-1);
+    containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
+    return containerQueuingLimit;
+  }
+
+  public abstract int getMaxQueueLength();
+
+  public abstract void setMaxQueueLength(int queueLength);
+
+  public abstract int getMaxQueueWaitTimeInMs();
+
+  public abstract void setMaxQueueWaitTimeInMs(int waitTime);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java
new file mode 100644
index 0000000..d071a08
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+
+/**
+ * Implementation of ContainerQueuingLimit interface.
+ */
+public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit {
+
+  private ContainerQueuingLimitProto proto =
+      ContainerQueuingLimitProto.getDefaultInstance();
+  private ContainerQueuingLimitProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public ContainerQueuingLimitPBImpl() {
+    builder = ContainerQueuingLimitProto.newBuilder();
+  }
+
+  public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) {
+    this.proto = proto;
+    this.viaProto = true;
+  }
+
+  public ContainerQueuingLimitProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return  proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ContainerQueuingLimitProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getMaxQueueWaitTimeInMs() {
+    ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMaxQueueWaitTimeInMs();
+  }
+
+  @Override
+  public void setMaxQueueWaitTimeInMs(int waitTime) {
+    maybeInitBuilder();
+    builder.setMaxQueueWaitTimeInMs(waitTime);
+  }
+
+  @Override
+  public int getMaxQueueLength() {
+    ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMaxQueueLength();
+  }
+
+  @Override
+  public void setMaxQueueLength(int queueLength) {
+    maybeInitBuilder();
+    builder.setMaxQueueLength(queueLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 786d8ee..a977653 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto {
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
   repeated ContainerProto containers_to_decrease = 12;
   repeated SignalContainerRequestProto containers_to_signal = 13;
+  optional ContainerQueuingLimitProto container_queuing_limit = 14;
+}
+
+message ContainerQueuingLimitProto {
+  optional int32 max_queue_length = 1;
+  optional int32 max_queue_wait_time_in_ms = 2;
 }
 
 message SystemCredentialsForAppsProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 626d0a1..cfcf1bd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -82,7 +82,7 @@ public interface Context {
 
   NodeHealthStatus getNodeHealthStatus();
 
-  ContainerManagementProtocol getContainerManager();
+  ContainerManager getContainerManager();
 
   NodeResourceMonitor getNodeResourceMonitor();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index b48706d..6ca7ffe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +59,7 @@ import 
org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -465,7 +465,7 @@ public class NodeManager extends CompositeService
 
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMTokenSecretManagerInNM nmTokenSecretManager;
-    private ContainerManagementProtocol containerManager;
+    private ContainerManager containerManager;
     private NodeResourceMonitor nodeResourceMonitor;
     private final LocalDirsHandlerService dirsHandler;
     private final ApplicationACLsManager aclsManager;
@@ -555,11 +555,11 @@ public class NodeManager extends CompositeService
     }
 
     @Override
-    public ContainerManagementProtocol getContainerManager() {
+    public ContainerManager getContainerManager() {
       return this.containerManager;
     }
 
-    public void setContainerManager(ContainerManagementProtocol 
containerManager) {
+    public void setContainerManager(ContainerManager containerManager) {
       this.containerManager = containerManager;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index c0f02e9..9b74f8d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,13 +71,14 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -401,8 +402,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
 
     LOG.info(successfullRegistrationMsg);
     LOG.info("Notifying ContainerManager to unblock new container-requests");
-    ((ContainerManagerImpl) this.context.getContainerManager())
-      .setBlockNewContainerRequests(false);
+    this.context.getContainerManager().setBlockNewContainerRequests(false);
   }
 
   private List<ApplicationId> createKeepAliveApplicationList() {
@@ -465,10 +465,8 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
    * @return Resource utilization of all the containers.
    */
   private ResourceUtilization getContainersUtilization() {
-    ContainerManagerImpl containerManager =
-        (ContainerManagerImpl) this.context.getContainerManager();
     ContainersMonitor containersMonitor =
-        containerManager.getContainersMonitor();
+        this.context.getContainerManager().getContainersMonitor();
     return containersMonitor.getContainersUtilization();
   }
 
@@ -735,7 +733,6 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
             Set<NodeLabel> nodeLabelsForHeartbeat =
                 nodeLabelsHandler.getNodeLabelsForHeartbeat();
             NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
-
             NodeHeartbeatRequest request =
                 NodeHeartbeatRequest.newInstance(nodeStatus,
                     NodeStatusUpdaterImpl.this.context
@@ -760,82 +757,70 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
 
-            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
-              LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part 
of"
-                  + " heartbeat, hence shutting down.");
-              LOG.warn("Message from ResourceManager: "
-                  + response.getDiagnosticsMessage());
-              context.setDecommissioned(true);
-              dispatcher.getEventHandler().handle(
-                  new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
-              break;
-            }
-            if (response.getNodeAction() == NodeAction.RESYNC) {
-              LOG.warn("Node is out of sync with ResourceManager,"
-                  + " hence resyncing.");
-              LOG.warn("Message from ResourceManager: "
-                  + response.getDiagnosticsMessage());
-              // Invalidate the RMIdentifier while resync
-              NodeStatusUpdaterImpl.this.rmIdentifier =
-                  ResourceManagerConstants.RM_INVALID_IDENTIFIER;
-              dispatcher.getEventHandler().handle(
-                  new NodeManagerEvent(NodeManagerEventType.RESYNC));
-              pendingCompletedContainers.clear();
-              break;
-            }
-
-            nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
-
-            // Explicitly put this method after checking the resync response. 
We
-            // don't want to remove the completed containers before resync
-            // because these completed containers will be reported back to RM
-            // when NM re-registers with RM.
-            // Only remove the cleanedup containers that are acked
-            removeOrTrackCompletedContainersFromContext(response
+            if (!handleShutdownOrResyncCommand(response)) {
+              nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
+                  response);
+
+              // Explicitly put this method after checking the resync
+              // response. We
+              // don't want to remove the completed containers before resync
+              // because these completed containers will be reported back to RM
+              // when NM re-registers with RM.
+              // Only remove the cleanedup containers that are acked
+              removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
-            logAggregationReportForAppsTempList.clear();
-            lastHeartbeatID = response.getResponseId();
-            List<ContainerId> containersToCleanup = response
-                .getContainersToCleanup();
-            if (!containersToCleanup.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrCompletedContainersEvent(containersToCleanup,
-                    CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
-            }
-            List<ApplicationId> appsToCleanup =
-                response.getApplicationsToCleanup();
-            //Only start tracking for keepAlive on FINISH_APP
-            trackAppsForKeepAlive(appsToCleanup);
-            if (!appsToCleanup.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrCompletedAppsEvent(appsToCleanup,
-                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
-            }
-
-            Map<ApplicationId, ByteBuffer> systemCredentials =
-                response.getSystemCredentialsForApps();
-            if (systemCredentials != null && !systemCredentials.isEmpty()) {
-              ((NMContext) context)
-                
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
-            }
+              logAggregationReportForAppsTempList.clear();
+              lastHeartbeatID = response.getResponseId();
+              List<ContainerId> containersToCleanup = response
+                  .getContainersToCleanup();
+              if (!containersToCleanup.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrCompletedContainersEvent(containersToCleanup,
+                        CMgrCompletedContainersEvent.Reason
+                            .BY_RESOURCEMANAGER));
+              }
+              List<ApplicationId> appsToCleanup =
+                  response.getApplicationsToCleanup();
+              //Only start tracking for keepAlive on FINISH_APP
+              trackAppsForKeepAlive(appsToCleanup);
+              if (!appsToCleanup.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrCompletedAppsEvent(appsToCleanup,
+                        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+              }
+              Map<ApplicationId, ByteBuffer> systemCredentials =
+                  response.getSystemCredentialsForApps();
+              if (systemCredentials != null && !systemCredentials.isEmpty()) {
+                ((NMContext) context).setSystemCrendentialsForApps(
+                    parseCredentials(systemCredentials));
+              }
+              List<org.apache.hadoop.yarn.api.records.Container>
+                  containersToDecrease = response.getContainersToDecrease();
+              if (!containersToDecrease.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrDecreaseContainersResourceEvent(
+                        containersToDecrease)
+                );
+              }
 
-            List<org.apache.hadoop.yarn.api.records.Container>
-                containersToDecrease = response.getContainersToDecrease();
-            if (!containersToDecrease.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrDecreaseContainersResourceEvent(containersToDecrease)
-              );
-            }
+              // SignalContainer request originally comes from end users via
+              // ClientRMProtocol's SignalContainer. Forward the request to
+              // ContainerManager which will dispatch the event to
+              // ContainerLauncher.
+              List<SignalContainerRequest> containersToSignal = response
+                  .getContainersToSignalList();
+              if (containersToSignal.size() != 0) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrSignalContainersEvent(containersToSignal));
+              }
 
-            // SignalContainer request originally comes from end users via
-            // ClientRMProtocol's SignalContainer. Forward the request to
-            // ContainerManager which will dispatch the event to 
ContainerLauncher.
-            List<SignalContainerRequest> containersToSignal = response
-                .getContainersToSignalList();
-            if (containersToSignal.size() != 0) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrSignalContainersEvent(containersToSignal));
+              // Update QueuingLimits if ContainerManager supports queuing
+              ContainerQueuingLimit queuingLimit =
+                  response.getContainerQueuingLimit();
+              if (queuingLimit != null) {
+                context.getContainerManager().updateQueuingLimit(queuingLimit);
+              }
             }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect 
RM
@@ -883,6 +868,34 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     statusUpdater.start();
   }
 
+  private boolean handleShutdownOrResyncCommand(
+      NodeHeartbeatResponse response) {
+    if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+      LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
+          + " heartbeat, hence shutting down.");
+      LOG.warn("Message from ResourceManager: "
+          + response.getDiagnosticsMessage());
+      context.setDecommissioned(true);
+      dispatcher.getEventHandler().handle(
+          new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+      return true;
+    }
+    if (response.getNodeAction() == NodeAction.RESYNC) {
+      LOG.warn("Node is out of sync with ResourceManager,"
+          + " hence resyncing.");
+      LOG.warn("Message from ResourceManager: "
+          + response.getDiagnosticsMessage());
+      // Invalidate the RMIdentifier while resync
+      NodeStatusUpdaterImpl.this.rmIdentifier =
+          ResourceManagerConstants.RM_INVALID_IDENTIFIER;
+      dispatcher.getEventHandler().handle(
+          new NodeManagerEvent(NodeManagerEventType.RESYNC));
+      pendingCompletedContainers.clear();
+      return true;
+    }
+    return false;
+  }
+
   private List<LogAggregationReport> getLogAggregationReportsForApps(
       ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) 
{
     LogAggregationReport status;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
new file mode 100644
index 0000000..0da02b3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+    .ContainersMonitor;
+
+/**
+ * The ContainerManager is an entity that manages the life cycle of Containers.
+ */
+public interface ContainerManager extends ServiceStateChangeListener,
+    ContainerManagementProtocol,
+    EventHandler<ContainerManagerEvent> {
+
+  ContainersMonitor getContainersMonitor();
+
+  void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
+
+  void setBlockNewContainerRequests(boolean blockNewContainerRequests);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 4383d2b..e1c4131 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -53,7 +53,6 @@ import 
org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -95,6 +94,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@@ -150,8 +150,7 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerManagerImpl extends CompositeService implements
-    ServiceStateChangeListener, ContainerManagementProtocol,
-    EventHandler<ContainerManagerEvent> {
+    ContainerManager {
 
   /**
    * Extra duration to wait for applications to be killed on shutdown.
@@ -410,6 +409,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
     }
   }
 
+  @Override
   public ContainersMonitor getContainersMonitor() {
     return this.containersMonitor;
   }
@@ -1398,6 +1398,7 @@ public class ContainerManagerImpl extends 
CompositeService implements
     }
   }
 
+  @Override
   public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
     this.blockNewContainerRequests.set(blockNewContainerRequests);
   }
@@ -1434,4 +1435,9 @@ public class ContainerManagerImpl extends 
CompositeService implements
   protected boolean isServiceStopped() {
     return serviceStopped;
   }
+
+  @Override
+  public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
+    LOG.trace("Implementation does not support queuing of Containers !!");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
index 94d3172..5b1b77a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -83,6 +84,7 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
   private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
 
   private Set<ContainerId> opportunisticContainersToKill;
+  private final ContainerQueuingLimit queuingLimit;
 
   public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -95,6 +97,7 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
     this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
     this.opportunisticContainersToKill = Collections.synchronizedSet(
         new HashSet<ContainerId>());
+    this.queuingLimit = ContainerQueuingLimit.newInstance();
   }
 
   @Override
@@ -526,6 +529,41 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
     }
   }
 
+  @Override
+  public void updateQueuingLimit(ContainerQueuingLimit limit) {
+    this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
+    // TODO: Include wait time as well once it is implemented
+    if (this.queuingLimit.getMaxQueueLength() > -1) {
+      shedQueuedOpportunisticContainers();
+    }
+  }
+
+  private void shedQueuedOpportunisticContainers() {
+    int numAllowed = this.queuingLimit.getMaxQueueLength();
+    Iterator<AllocatedContainerInfo> containerIter =
+        queuedOpportunisticContainers.iterator();
+    while (containerIter.hasNext()) {
+      AllocatedContainerInfo cInfo = containerIter.next();
+      if (numAllowed <= 0) {
+        containerIter.remove();
+        ContainerTokenIdentifier containerTokenIdentifier = this.context
+            .getQueuingContext().getQueuedContainers().remove(
+                cInfo.getContainerTokenIdentifier().getContainerID());
+        // The Container might have already started while we were
+        // iterating..
+        if (containerTokenIdentifier != null) {
+          this.context.getQueuingContext().getKilledQueuedContainers()
+              .putIfAbsent(cInfo.getContainerTokenIdentifier(),
+                  "Container De-queued to meet global queuing limits. "
+                      + "Max Queue length["
+                      + this.queuingLimit.getMaxQueueLength() + "]");
+        }
+      }
+      numAllowed--;
+    }
+  }
+
+
   static class AllocatedContainerInfo {
     private final ContainerTokenIdentifier containerTokenIdentifier;
     private final StartContainerRequest startRequest;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index ce405f8..6ff74c9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -642,7 +643,7 @@ public abstract class BaseAMRMProxyTest {
     }
 
     @Override
-    public ContainerManagementProtocol getContainerManager() {
+    public ContainerManager getContainerManager() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
index 4fd62d0..5fb05ca 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
@@ -24,13 +24,17 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 import java.util.List;
 
+/**
+ * Implementations of this class are notified of changes to the cluster's 
state,
+ * such as node addition, removal and updates.
+ */
 public interface ClusterMonitor {
 
   void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
 
   void removeNode(RMNode removedRMNode);
 
-  void nodeUpdate(RMNode rmNode);
+  void updateNode(RMNode rmNode);
 
   void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index 170d91a..a93f683 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import 
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
-    .TopKNodeSelector;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
+
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -57,7 +59,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -76,30 +77,64 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
   private static final Log LOG =
       LogFactory.getLog(DistributedSchedulingService.class);
 
-  private final TopKNodeSelector clusterMonitor;
+  private final NodeQueueLoadMonitor nodeMonitor;
 
   private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
       new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
       new ConcurrentHashMap<>();
+  private final int k;
 
   public DistributedSchedulingService(RMContext rmContext,
       YarnScheduler scheduler) {
     super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
-    int k = rmContext.getYarnConfiguration().getInt(
+    this.k = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.DIST_SCHEDULING_TOP_K,
         YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
-    long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
-    TopKNodeSelector.TopKComparator comparator =
-        TopKNodeSelector.TopKComparator.valueOf(
+    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
+        YarnConfiguration.
+            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+    NodeQueueLoadMonitor.LoadComparator comparator =
+        NodeQueueLoadMonitor.LoadComparator.valueOf(
             rmContext.getYarnConfiguration().get(
-                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
-                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
-    TopKNodeSelector topKSelector =
-        new TopKNodeSelector(k, topKComputationInterval, comparator);
-    this.clusterMonitor = topKSelector;
+                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
+                YarnConfiguration.
+                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
+
+    NodeQueueLoadMonitor topKSelector =
+        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+
+    float sigma = rmContext.getYarnConfiguration()
+        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
+            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
+
+    int limitMin, limitMax;
+
+    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
+    } else {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
+    }
+
+    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
+    this.nodeMonitor = topKSelector;
   }
 
   @Override
@@ -189,7 +224,7 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
 
     // Set nodes to be used for scheduling
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.clusterMonitor.selectNodes()));
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
     return dsResp;
   }
 
@@ -201,7 +236,7 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
         (DistSchedAllocateResponse.class);
     dsResp.setAllocateResponse(response);
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.clusterMonitor.selectNodes()));
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
     return dsResp;
   }
 
@@ -229,67 +264,72 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
   @Override
   public void handle(SchedulerEvent event) {
     switch (event.getType()) {
-      case NODE_ADDED:
-        if (!(event instanceof NodeAddedSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeAddedSchedulerEvent nodeAddedEvent = 
(NodeAddedSchedulerEvent)event;
-        clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
-            nodeAddedEvent.getAddedRMNode());
-        addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
-            nodeAddedEvent.getAddedRMNode().getNodeID());
-        addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
-            nodeAddedEvent.getAddedRMNode().getNodeID());
-        break;
-      case NODE_REMOVED:
-        if (!(event instanceof NodeRemovedSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeRemovedSchedulerEvent nodeRemovedEvent =
-            (NodeRemovedSchedulerEvent)event;
-        clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
-        removeFromMapping(rackToNode,
-            nodeRemovedEvent.getRemovedRMNode().getRackName(),
-            nodeRemovedEvent.getRemovedRMNode().getNodeID());
-        removeFromMapping(hostToNode,
-            nodeRemovedEvent.getRemovedRMNode().getHostName(),
-            nodeRemovedEvent.getRemovedRMNode().getNodeID());
-        break;
-      case NODE_UPDATE:
-        if (!(event instanceof NodeUpdateSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeUpdateSchedulerEvent nodeUpdatedEvent = 
(NodeUpdateSchedulerEvent)event;
-        clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
-        break;
-      case NODE_RESOURCE_UPDATE:
-        if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
-            (NodeResourceUpdateSchedulerEvent)event;
-        clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
-            nodeResourceUpdatedEvent.getResourceOption());
-        break;
-
-      // <-- IGNORED EVENTS : START -->
-      case APP_ADDED:
-        break;
-      case APP_REMOVED:
-        break;
-      case APP_ATTEMPT_ADDED:
-        break;
-      case APP_ATTEMPT_REMOVED:
-        break;
-      case CONTAINER_EXPIRED:
-        break;
-      case NODE_LABELS_UPDATE:
-        break;
-      // <-- IGNORED EVENTS : END -->
-      default:
-        LOG.error("Unknown event arrived at DistributedSchedulingService: "
-            + event.toString());
+    case NODE_ADDED:
+      if (!(event instanceof NodeAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
+      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+          nodeAddedEvent.getAddedRMNode());
+      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      break;
+    case NODE_REMOVED:
+      if (!(event instanceof NodeRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeRemovedSchedulerEvent nodeRemovedEvent =
+          (NodeRemovedSchedulerEvent) event;
+      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+      removeFromMapping(rackToNode,
+          nodeRemovedEvent.getRemovedRMNode().getRackName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      removeFromMapping(hostToNode,
+          nodeRemovedEvent.getRemovedRMNode().getHostName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      break;
+    case NODE_UPDATE:
+      if (!(event instanceof NodeUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
+          event;
+      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+      break;
+    case NODE_RESOURCE_UPDATE:
+      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+          (NodeResourceUpdateSchedulerEvent) event;
+      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+          nodeResourceUpdatedEvent.getResourceOption());
+      break;
+
+    // <-- IGNORED EVENTS : START -->
+    case APP_ADDED:
+      break;
+    case APP_REMOVED:
+      break;
+    case APP_ATTEMPT_ADDED:
+      break;
+    case APP_ATTEMPT_REMOVED:
+      break;
+    case CONTAINER_EXPIRED:
+      break;
+    case NODE_LABELS_UPDATE:
+      break;
+    // <-- IGNORED EVENTS : END -->
+    default:
+      LOG.error("Unknown event arrived at DistributedSchedulingService: "
+          + event.toString());
     }
+
   }
 
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return nodeMonitor.getThresholdCalculator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index f50da3b..b063ecb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -39,6 +39,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -139,4 +141,6 @@ public interface RMContext {
   void setLeaderElectorService(LeaderElectorService elector);
 
   LeaderElectorService getLeaderElectorService();
+
+  QueueLimitCalculator getNodeManagerQueueLimitCalculator();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index ec2aeb7..d228388 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -43,6 +43,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -74,6 +76,8 @@ public class RMContextImpl implements RMContext {
   private SystemMetricsPublisher systemMetricsPublisher;
   private LeaderElectorService elector;
 
+  private QueueLimitCalculator queueLimitCalculator;
+
   /**
    * Default constructor. To be used in conjunction with setter methods for
    * individual fields.
@@ -472,4 +476,14 @@ public class RMContextImpl implements RMContext {
   public void setQueuePlacementManager(PlacementManager placementMgr) {
     this.activeServiceContext.setQueuePlacementManager(placementMgr);
   }
+
+  @Override
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return this.queueLimitCalculator;
+  }
+
+  public void setContainerQueueLimitCalculator(
+      QueueLimitCalculator limitCalculator) {
+    this.queueLimitCalculator = limitCalculator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 6c80a58..f9d3325 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1154,6 +1154,8 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
       addService(distSchedulerEventDispatcher);
       rmDispatcher.register(SchedulerEventType.class,
           distSchedulerEventDispatcher);
+      this.rmContext.setContainerQueueLimitCalculator(
+          distributedSchedulingService.getNodeManagerQueueLimitCalculator());
       return distributedSchedulingService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index b0bc565..d306b60 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -536,6 +536,13 @@ public class ResourceTrackerService extends 
AbstractService implements
       }
     }
 
+    // 6. Send Container Queuing Limits back to the Node. This will be used by
+    //    the node to truncate the number of Containers queued for execution.
+    if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
+      nodeHeartBeatResponse.setContainerQueuingLimit(
+          this.rmContext.getNodeManagerQueueLimitCalculator()
+              .createContainerQueuingLimit());
+    }
     return nodeHeartBeatResponse;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
new file mode 100644
index 0000000..21f4f6e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
+ * and total wait time) associated with Container Queues on the Node Manager.
+ * It uses this information to periodically sort the Nodes from least to most
+ * loaded.
+ */
+public class NodeQueueLoadMonitor implements ClusterMonitor {
+
+  final static Log LOG = LogFactory.getLog(NodeQueueLoadMonitor.class);
+
+  /**
+   * The comparator used to specify the metric against which the load
+   * of two Nodes are compared.
+   */
+  public enum LoadComparator implements Comparator<ClusterNode> {
+    QUEUE_LENGTH,
+    QUEUE_WAIT_TIME;
+
+    @Override
+    public int compare(ClusterNode o1, ClusterNode o2) {
+      if (getMetric(o1) == getMetric(o2)) {
+        return o1.timestamp < o2.timestamp ? +1 : -1;
+      }
+      return getMetric(o1) > getMetric(o2) ? +1 : -1;
+    }
+
+    public int getMetric(ClusterNode c) {
+      return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime;
+    }
+  }
+
+  static class ClusterNode {
+    int queueLength = 0;
+    int queueWaitTime = -1;
+    double timestamp;
+    final NodeId nodeId;
+
+    public ClusterNode(NodeId nodeId) {
+      this.nodeId = nodeId;
+      updateTimestamp();
+    }
+
+    public ClusterNode setQueueLength(int qLength) {
+      this.queueLength = qLength;
+      return this;
+    }
+
+    public ClusterNode setQueueWaitTime(int wTime) {
+      this.queueWaitTime = wTime;
+      return this;
+    }
+
+    public ClusterNode updateTimestamp() {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    }
+  }
+
+  private final ScheduledExecutorService scheduledExecutor;
+
+  private final List<NodeId> sortedNodes;
+  private final Map<NodeId, ClusterNode> clusterNodes =
+      new ConcurrentHashMap<>();
+  private final LoadComparator comparator;
+  private QueueLimitCalculator thresholdCalculator;
+
+  Runnable computeTask = new Runnable() {
+    @Override
+    public void run() {
+      synchronized (sortedNodes) {
+        sortedNodes.clear();
+        sortedNodes.addAll(sortNodes());
+        if (thresholdCalculator != null) {
+          thresholdCalculator.update();
+        }
+      }
+    }
+  };
+
+  @VisibleForTesting
+  NodeQueueLoadMonitor(LoadComparator comparator) {
+    this.sortedNodes = new ArrayList<>();
+    this.comparator = comparator;
+    this.scheduledExecutor = null;
+  }
+
+  public NodeQueueLoadMonitor(long nodeComputationInterval,
+      LoadComparator comparator) {
+    this.sortedNodes = new ArrayList<>();
+    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
+    this.comparator = comparator;
+    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
+        nodeComputationInterval, nodeComputationInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  List<NodeId> getSortedNodes() {
+    return sortedNodes;
+  }
+
+  public QueueLimitCalculator getThresholdCalculator() {
+    return thresholdCalculator;
+  }
+
+  Map<NodeId, ClusterNode> getClusterNodes() {
+    return clusterNodes;
+  }
+
+  Comparator<ClusterNode> getComparator() {
+    return comparator;
+  }
+
+  public void initThresholdCalculator(float sigma, int limitMin, int limitMax) 
{
+    this.thresholdCalculator =
+        new QueueLimitCalculator(this, sigma, limitMin, limitMax);
+  }
+
+  @Override
+  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
+      rmNode) {
+    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    // Ignoring this currently : at least one NODE_UPDATE heartbeat is
+    // required to ensure node eligibility.
+  }
+
+  @Override
+  public void removeNode(RMNode removedRMNode) {
+    LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
+    synchronized (this.clusterNodes) {
+      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
+        this.clusterNodes.remove(removedRMNode.getNodeID());
+        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
+      } else {
+        LOG.debug("Node not in list!");
+      }
+    }
+  }
+
+  @Override
+  public void updateNode(RMNode rmNode) {
+    LOG.debug("Node update event from: " + rmNode.getNodeID());
+    QueuedContainersStatus queuedContainersStatus =
+        rmNode.getQueuedContainersStatus();
+    int estimatedQueueWaitTime =
+        queuedContainersStatus.getEstimatedQueueWaitTime();
+    int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
+    // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
+    // UNLESS comparator is based on queue length.
+    synchronized (this.clusterNodes) {
+      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
+      if (currentNode == null) {
+        if (estimatedQueueWaitTime != -1
+            || comparator == LoadComparator.QUEUE_LENGTH) {
+          this.clusterNodes.put(rmNode.getNodeID(),
+              new ClusterNode(rmNode.getNodeID())
+                  .setQueueWaitTime(estimatedQueueWaitTime)
+                  .setQueueLength(waitQueueLength));
+          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        }
+      } else {
+        if (estimatedQueueWaitTime != -1
+            || comparator == LoadComparator.QUEUE_LENGTH) {
+          currentNode
+              .setQueueWaitTime(estimatedQueueWaitTime)
+              .setQueueLength(waitQueueLength)
+              .updateTimestamp();
+          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          this.clusterNodes.remove(rmNode.getNodeID());
+          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + currentNode.queueWaitTime + "] and " +
+              "wait queue length [" + currentNode.queueLength + "]");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) 
{
+    LOG.debug("Node resource update event from: " + rmNode.getNodeID());
+    // Ignoring this currently.
+  }
+
+  /**
+   * Returns all Node Ids as ordered list from Least to Most Loaded.
+   * @return ordered list of nodes
+   */
+  public List<NodeId> selectNodes() {
+    return selectLeastLoadedNodes(-1);
+  }
+
+  /**
+   * Returns 'K' of the least Loaded Node Ids as ordered list.
+   * @param k max number of nodes to return
+   * @return ordered list of nodes
+   */
+  public List<NodeId> selectLeastLoadedNodes(int k) {
+    synchronized (this.sortedNodes) {
+      return ((k < this.sortedNodes.size()) && (k >= 0)) ?
+          new ArrayList<>(this.sortedNodes).subList(0, k) :
+          new ArrayList<>(this.sortedNodes);
+    }
+  }
+
+  private List<NodeId> sortNodes() {
+    synchronized (this.clusterNodes) {
+      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
+      List<NodeId> retList = new ArrayList<>();
+      Object[] nodes = aList.toArray();
+      // Collections.sort would do something similar by calling Arrays.sort
+      // internally but would finally iterate through the input list (aList)
+      // to reset the value of each element. Since we don't really care about
+      // 'aList', we can use the iteration to create the list of nodeIds which
+      // is what we ultimately care about.
+      Arrays.sort(nodes, (Comparator)comparator);
+      for (int j=0; j < nodes.length; j++) {
+        retList.add(((ClusterNode)nodes[j]).nodeId);
+      }
+      return retList;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java
new file mode 100644
index 0000000..ab3a577
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class interacts with the NodeQueueLoadMonitor to keep track of the
+ * mean and standard deviation of the configured metrics (queue length or queue
+ * wait time) used to characterize the queue load of a specific node.
+ * The NodeQueueLoadMonitor triggers an update (by calling the
+ * <code>update()</code> method) every time it performs a re-ordering of
+ * all nodes.
+ */
+public class QueueLimitCalculator {
+
+  class Stats {
+    private final AtomicInteger mean = new AtomicInteger(0);
+    private final AtomicInteger stdev = new AtomicInteger(0);
+
+    /**
+     * Not thread safe. Caller should synchronize on sorted nodes list.
+     */
+    void update() {
+      List<NodeId> sortedNodes = nodeSelector.getSortedNodes();
+      if (sortedNodes.size() > 0) {
+        // Calculate mean
+        int sum = 0;
+        for (NodeId n : sortedNodes) {
+          sum += getMetric(getNode(n));
+        }
+        mean.set(sum / sortedNodes.size());
+
+        // Calculate stdev
+        int sqrSumMean = 0;
+        for (NodeId n : sortedNodes) {
+          int val = getMetric(getNode(n));
+          sqrSumMean += Math.pow(val - mean.get(), 2);
+        }
+        stdev.set(
+            (int) Math.round(Math.sqrt(
+                sqrSumMean / (float) sortedNodes.size())));
+      }
+    }
+
+    private ClusterNode getNode(NodeId nId) {
+      return nodeSelector.getClusterNodes().get(nId);
+    }
+
+    private int getMetric(ClusterNode cn) {
+      return (cn != null) ? ((LoadComparator)nodeSelector.getComparator())
+              .getMetric(cn) : 0;
+    }
+
+    public int getMean() {
+      return mean.get();
+    }
+
+    public int getStdev() {
+      return stdev.get();
+    }
+  }
+
+  private final NodeQueueLoadMonitor nodeSelector;
+  private final float sigma;
+  private final int rangeMin;
+  private final int rangeMax;
+  private final Stats stats = new Stats();
+
+  QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma,
+      int rangeMin, int rangeMax) {
+    this.nodeSelector = selector;
+    this.sigma = sigma;
+    this.rangeMax = rangeMax;
+    this.rangeMin = rangeMin;
+  }
+
+  private int determineThreshold() {
+    return (int) (stats.getMean() + sigma * stats.getStdev());
+  }
+
+  void update() {
+    this.stats.update();
+  }
+
+  private int getThreshold() {
+    int thres = determineThreshold();
+    return Math.min(rangeMax, Math.max(rangeMin, thres));
+  }
+
+  public ContainerQueuingLimit createContainerQueuingLimit() {
+    ContainerQueuingLimit containerQueuingLimit =
+        ContainerQueuingLimit.newInstance();
+    if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) {
+      containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold());
+      containerQueuingLimit.setMaxQueueLength(-1);
+    } else {
+      containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
+      containerQueuingLimit.setMaxQueueLength(getThreshold());
+    }
+    return containerQueuingLimit;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to