Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 487c920bb -> cc0c966a4


YARN-7587. Skip dispatching opportunistic containers to nodes whose queue is 
already full. (Weiwei Yang via asuresh)

(cherry picked from commit 37ca4169508c3003dbe9044fefd37eb8cd8c0503)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc0c966a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc0c966a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc0c966a

Branch: refs/heads/branch-2.9
Commit: cc0c966a4e0a8f4e82b65a571d247bfc4f3d96a7
Parents: 487c920
Author: Arun Suresh <asur...@apache.org>
Authored: Sun Dec 3 22:22:01 2017 -0800
Committer: Arun Suresh <asur...@apache.org>
Committed: Sun Dec 3 22:25:06 2017 -0800

----------------------------------------------------------------------
 .../records/OpportunisticContainersStatus.java  | 19 +++++++++++++++++
 .../pb/OpportunisticContainersStatusPBImpl.java | 13 ++++++++++++
 .../main/proto/yarn_server_common_protos.proto  |  1 +
 .../scheduler/ContainerScheduler.java           | 12 +++++++++++
 .../distributed/NodeQueueLoadMonitor.java       | 22 ++++++++++++++++++--
 .../distributed/TestNodeQueueLoadMonitor.java   | 21 +++++++++++++++++++
 6 files changed, 86 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
index 732db2a..c8a81a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
@@ -149,4 +149,23 @@ public abstract class OpportunisticContainersStatus {
   @Unstable
   public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
 
+
+  /**
+   * Gets the capacity of the opportunistic containers queue on the node.
+   *
+   * @return queue capacity.
+   */
+  @Private
+  @Unstable
+  public abstract int getOpportQueueCapacity();
+
+
+  /**
+   * Sets the capacity of the opportunistic containers queue on the node.
+   *
+   * @param queueCapacity queue capacity.
+   */
+  @Private
+  @Unstable
+  public abstract void setOpportQueueCapacity(int queueCapacity);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
index 8399713..5d1005c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
@@ -136,4 +136,17 @@ public class OpportunisticContainersStatusPBImpl
     maybeInitBuilder();
     builder.setEstimatedQueueWaitTime(queueWaitTime);
   }
+
+  @Override
+  public int getOpportQueueCapacity() {
+    YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getOpportQueueCapacity();
+  }
+
+  @Override
+  public void setOpportQueueCapacity(int maxOpportQueueLength) {
+    maybeInitBuilder();
+    builder.setOpportQueueCapacity(maxOpportQueueLength);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 98b172d..8200808 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto {
   optional int32 queued_opport_containers = 4;
   optional int32 wait_queue_length = 5;
   optional int32 estimated_queue_wait_time = 6;
+  optional int32 opport_queue_capacity = 7;
 }
 
 message MasterKeyProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 2c28985..a8c0e5a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -68,6 +68,7 @@ public class ContainerScheduler extends AbstractService 
implements
       LoggerFactory.getLogger(ContainerScheduler.class);
 
   private final Context context;
+  // Capacity of the queue for opportunistic Containers.
   private final int maxOppQueueLength;
 
   // Queue of Guaranteed Containers waiting for resources to run
@@ -258,6 +259,15 @@ public class ContainerScheduler extends AbstractService 
implements
         + this.queuedOpportunisticContainers.size();
   }
 
+  /**
+   * Return the capacity of the queue for opportunistic containers
+   * on this node.
+   * @return queue capacity.
+   */
+  public int getOpportunisticQueueCapacity() {
+    return this.maxOppQueueLength;
+  }
+
   @VisibleForTesting
   public int getNumQueuedGuaranteedContainers() {
     return this.queuedGuaranteedContainers.size();
@@ -290,6 +300,8 @@ public class ContainerScheduler extends AbstractService 
implements
         metrics.getAllocatedOpportunisticVCores());
     this.opportunisticContainersStatus.setRunningOpportContainers(
         metrics.getRunningOpportunisticContainers());
+    this.opportunisticContainersStatus.setOpportQueueCapacity(
+        getOpportunisticQueueCapacity());
     return this.opportunisticContainersStatus;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index ed0ee1e..e989099 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -75,6 +75,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     int queueWaitTime = -1;
     double timestamp;
     final NodeId nodeId;
+    private int queueCapacity = 0;
 
     public ClusterNode(NodeId nodeId) {
       this.nodeId = nodeId;
@@ -95,6 +96,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       this.timestamp = System.currentTimeMillis();
       return this;
     }
+
+    public ClusterNode setQueueCapacity(int capacity) {
+      this.queueCapacity = capacity;
+      return this;
+    }
+
+    public boolean isQueueFull() {
+      return this.queueCapacity > 0 &&
+          this.queueLength >= this.queueCapacity;
+    }
   }
 
   private final ScheduledExecutorService scheduledExecutor;
@@ -207,6 +218,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor 
{
       opportunisticContainersStatus =
           OpportunisticContainersStatus.newInstance();
     }
+    int opportQueueCapacity =
+        opportunisticContainersStatus.getOpportQueueCapacity();
     int estimatedQueueWaitTime =
         opportunisticContainersStatus.getEstimatedQueueWaitTime();
     int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
@@ -222,7 +235,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor 
{
           this.clusterNodes.put(rmNode.getNodeID(),
               new ClusterNode(rmNode.getNodeID())
                   .setQueueWaitTime(estimatedQueueWaitTime)
-                  .setQueueLength(waitQueueLength));
+                  .setQueueLength(waitQueueLength)
+                  .setQueueCapacity(opportQueueCapacity));
           LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "wait queue length [" + waitQueueLength + "]");
@@ -301,7 +315,11 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
       // is what we ultimately care about.
       Arrays.sort(nodes, (Comparator)comparator);
       for (int j=0; j < nodes.length; j++) {
-        retList.add(((ClusterNode)nodes[j]).nodeId);
+        ClusterNode cNode = (ClusterNode)nodes[j];
+        // Exclude nodes whose queue is already full.
+        if (!cNode.isQueueFull()) {
+          retList.add(cNode.nodeId);
+        }
       }
       return retList;
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc0c966a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
index dfd21ff..85eddaa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -33,6 +33,8 @@ import java.util.List;
  */
 public class TestNodeQueueLoadMonitor {
 
+  private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
+
   static class FakeNodeId extends NodeId {
     final String host;
     final int port;
@@ -132,6 +134,17 @@ public class TestNodeQueueLoadMonitor {
     Assert.assertEquals("h2:2", nodeIds.get(1).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+
+    // Now update h3 and fill its queue.
+    selector.updateNode(createRMNode("h3", 3, -1,
+        DEFAULT_MAX_QUEUE_LENGTH));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("4-> "+ nodeIds);
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
   }
 
   @Test
@@ -180,6 +193,12 @@ public class TestNodeQueueLoadMonitor {
 
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength) {
+    return createRMNode(host, port, waitTime, queueLength,
+        DEFAULT_MAX_QUEUE_LENGTH);
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength, int queueCapacity) {
     RMNode node1 = Mockito.mock(RMNode.class);
     NodeId nID1 = new FakeNodeId(host, port);
     Mockito.when(node1.getNodeID()).thenReturn(nID1);
@@ -189,6 +208,8 @@ public class TestNodeQueueLoadMonitor {
         .thenReturn(waitTime);
     Mockito.when(status1.getWaitQueueLength())
         .thenReturn(queueLength);
+    Mockito.when(status1.getOpportQueueCapacity())
+        .thenReturn(queueCapacity);
     Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
     return node1;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to