Repository: hadoop
Updated Branches:
  refs/heads/yarn-2877 b00875e60 -> 2340511f7


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
new file mode 100644
index 0000000..5aedbed
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TopKNodeSelector implements ClusterMonitor, NodeSelector {
+
+  final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
+
+  enum TopKComparator implements Comparator<ClusterNode> {
+    WAIT_TIME,
+    QUEUE_LENGTH;
+
+    @Override
+    public int compare(ClusterNode o1, ClusterNode o2) {
+      if (getQuant(o1) == getQuant(o2)) {
+        return o1.timestamp < o2.timestamp ? +1 : -1;
+      }
+      return getQuant(o1) > getQuant(o2) ? +1 : -1;
+    }
+
+    private int getQuant(ClusterNode c) {
+      return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
+    }
+  }
+
+  static class ClusterNode {
+    int queueTime = -1;
+    int waitQueueLength = 0;
+    double timestamp;
+    final NodeId nodeId;
+
+    public ClusterNode(NodeId nodeId) {
+      this.nodeId = nodeId;
+      updateTimestamp();
+    }
+
+    public ClusterNode setQueueTime(int queueTime) {
+      this.queueTime = queueTime;
+      return this;
+    }
+
+    public ClusterNode setWaitQueueLength(int queueLength) {
+      this.waitQueueLength = queueLength;
+      return this;
+    }
+
+    public ClusterNode updateTimestamp() {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    }
+  }
+
+  private final int k;
+  private final List<NodeId> topKNodes;
+  private final ScheduledExecutorService scheduledExecutor;
+  private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
+  private final Comparator<ClusterNode> comparator;
+
+  Runnable computeTask = new Runnable() {
+    @Override
+    public void run() {
+      synchronized (topKNodes) {
+        topKNodes.clear();
+        topKNodes.addAll(computeTopKNodes());
+      }
+    }
+  };
+
+  @VisibleForTesting
+  TopKNodeSelector(int k, TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.comparator = comparator;
+    this.scheduledExecutor = null;
+  }
+
+  public TopKNodeSelector(int k, long nodeComputationInterval,
+      TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
+    this.comparator = comparator;
+    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
+        nodeComputationInterval, nodeComputationInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+
+  @Override
+  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
+      rmNode) {
+    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
+    // required to ensure node eligibility.
+  }
+
+  @Override
+  public void removeNode(RMNode removedRMNode) {
+    LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
+    synchronized (this.clusterNodes) {
+      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
+        this.clusterNodes.remove(removedRMNode.getNodeID());
+        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
+      } else {
+        LOG.debug("Node not in list!");
+      }
+    }
+  }
+
+  @Override
+  public void nodeUpdate(RMNode rmNode) {
+    LOG.debug("Node update event from: " + rmNode.getNodeID());
+    QueuedContainersStatus queuedContainersStatus =
+        rmNode.getQueuedContainersStatus();
+    int estimatedQueueWaitTime =
+        queuedContainersStatus.getEstimatedQueueWaitTime();
+    int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
+    // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
+    // UNLESS comparator is based on queue length, in which case, we should add
+    synchronized (this.clusterNodes) {
+      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
+      if (currentNode == null) {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          this.clusterNodes.put(rmNode.getNodeID(),
+              new ClusterNode(rmNode.getNodeID())
+                  .setQueueTime(estimatedQueueWaitTime)
+                  .setWaitQueueLength(waitQueueLength));
+          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        }
+      } else {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          currentNode
+              .setQueueTime(estimatedQueueWaitTime)
+              .setWaitQueueLength(waitQueueLength)
+              .updateTimestamp();
+          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          this.clusterNodes.remove(rmNode.getNodeID());
+          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + currentNode.queueTime + "] and " +
+              "wait queue length [" + currentNode.waitQueueLength + "]");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) 
{
+    LOG.debug("Node resource update event from: " + rmNode.getNodeID());
+    // Ignoring this currently...
+  }
+
+  @Override
+  public List<NodeId> selectNodes() {
+    synchronized (this.topKNodes) {
+      return this.k < this.topKNodes.size() ?
+          new ArrayList<>(this.topKNodes).subList(0, this.k) :
+          new ArrayList<>(this.topKNodes);
+    }
+  }
+
+  @Override
+  public List<NodeId> selectNodes(Collection<SelectionHint> hints) {
+    List<NodeId> retList = selectNodes();
+    Set<NodeId> alreadyAdded = new HashSet<>(retList);
+    TreeSet<ClusterNode> toAdd = new TreeSet<>(this.comparator);
+    synchronized (this.clusterNodes) {
+      for (SelectionHint hint : hints) {
+        // Sort the nodes in the criteria (We need the best nodes)
+        PriorityQueue<ClusterNode> temp =
+            new PriorityQueue<>(hint.getNodeIds().length, this.comparator);
+        for (NodeId n : hint.getNodeIds()) {
+          if (!alreadyAdded.contains(n) &&
+              !toAdd.contains(clusterNodes.get(n))) {
+            temp.add(clusterNodes.get(n));
+          }
+        }
+        // From the Sorted list, select the 'minToInclude' best nodes
+        int numIncluded = 0;
+        while (!temp.isEmpty()) {
+          if (numIncluded < hint.getMinToInclude()) {
+            ClusterNode cn = temp.remove();
+            toAdd.add(cn);
+            alreadyAdded.add(cn.nodeId);
+            numIncluded++;
+          } else {
+            break;
+          }
+        }
+      }
+    }
+
+    if (toAdd.size() > 0) {
+      ArrayList<NodeId> newList = new ArrayList<>();
+      for (ClusterNode cn : toAdd) {
+        newList.add(cn.nodeId);
+      }
+      newList.addAll(retList);
+      retList = newList;
+    }
+    return retList;
+  }
+
+  private List<NodeId> computeTopKNodes() {
+    synchronized (this.clusterNodes) {
+      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
+      List<NodeId> retList = new ArrayList<>();
+      Object[] nodes = aList.toArray();
+      // Collections.sort would do something similar by calling Arrays.sort
+      // internally but would finally iterate through the input list (aList)
+      // to reset the value of each element.. Since we don't really care about
+      // 'aList', we can use the iteration to create the list of nodeIds which
+      // is what we ultimately care about.
+      Arrays.sort(nodes, (Comparator)comparator);
+      for (int j=0; j < nodes.length; j++) {
+        retList.add(((ClusterNode)nodes[j]).nodeId);
+      }
+      return retList;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..f5b61a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -260,6 +260,10 @@ public class MockNodes {
     public ResourceUtilization getNodeUtilization() {
       return this.nodeUtilization;
     }
+
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6182b07..9b4e1c7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -90,6 +90,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+    .DistributedSchedulingService;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 3fa377e..c45fba8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,10 +169,11 @@ public class TestApplicationCleanup {
     MockRM rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
deleted file mode 100644
index 262fd5a..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
-    .AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
-public class TestDistributedSchedulingService {
-
-  // Test if the DistributedSchedulingService can handle both DSProtocol as
-  // well as AMProtocol clients
-  @Test
-  public void testRPCWrapping() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
-    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
-    final RMContext rmContext = new RMContextImpl() {
-      @Override
-      public AMLivelinessMonitor getAMLivelinessMonitor() {
-        return null;
-      }
-    };
-    DistributedSchedulingService service =
-        new DistributedSchedulingService(rmContext, null) {
-          @Override
-          public RegisterApplicationMasterResponse registerApplicationMaster
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
-                RegisterApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setQueue("dummyQueue");
-            return resp;
-          }
-
-          @Override
-          public FinishApplicationMasterResponse finishApplicationMaster
-              (FinishApplicationMasterRequest request) throws YarnException,
-              IOException {
-            FinishApplicationMasterResponse resp = factory.newRecordInstance(
-                FinishApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setIsUnregistered(false);
-            return resp;
-          }
-
-          @Override
-          public AllocateResponse allocate(AllocateRequest request) throws
-              YarnException, IOException {
-            AllocateResponse response = factory.newRecordInstance
-                (AllocateResponse.class);
-            response.setNumClusterNodes(12345);
-            return response;
-          }
-
-          @Override
-          public DistSchedRegisterResponse
-          registerApplicationMasterForDistributedScheduling
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            DistSchedRegisterResponse resp = factory.newRecordInstance(
-                DistSchedRegisterResponse.class);
-            resp.setContainerIdStart(54321l);
-            return resp;
-          }
-
-          @Override
-          public DistSchedAllocateResponse allocateForDistributedScheduling
-              (AllocateRequest request) throws YarnException, IOException {
-            DistSchedAllocateResponse resp =
-                factory.newRecordInstance(DistSchedAllocateResponse.class);
-            resp.setNodesForScheduling(
-                Arrays.asList(NodeId.newInstance("h1", 1234)));
-            return resp;
-          }
-        };
-    Server server = service.getServer(rpc, conf, addr, null);
-    server.start();
-
-    // Verify that the DistrubutedSchedulingService can handle vanilla
-    // ApplicationMasterProtocol clients
-    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
-        ProtobufRpcEngine.class);
-    ApplicationMasterProtocol ampProxy =
-        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
-    RegisterApplicationMasterResponse regResp = 
ampProxy.registerApplicationMaster(
-            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
-    Assert.assertEquals("dummyQueue", regResp.getQueue());
-    FinishApplicationMasterResponse finishResp = ampProxy
-        .finishApplicationMaster(factory.newRecordInstance(
-            FinishApplicationMasterRequest.class));
-    Assert.assertEquals(false, finishResp.getIsUnregistered());
-    AllocateResponse allocResp = ampProxy
-        .allocate(factory.newRecordInstance(AllocateRequest.class));
-    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
-    // Verify that the DistrubutedSchedulingService can handle the
-    // DistributedSchedulerProtocol clients as well
-    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
-        ProtobufRpcEngine.class);
-    DistributedSchedulerProtocol dsProxy =
-        (DistributedSchedulerProtocol) 
rpc.getProxy(DistributedSchedulerProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
-
-    DistSchedRegisterResponse dsRegResp =
-        dsProxy.registerApplicationMasterForDistributedScheduling(
-        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
-    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
-    DistSchedAllocateResponse dsAllocResp =
-        dsProxy.allocateForDistributedScheduling(
-            factory.newRecordInstance(AllocateRequest.class));
-    Assert.assertEquals(
-        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index d9306dd..429817e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import 
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -44,8 +44,8 @@ public class TestRMDispatcher {
     AsyncDispatcher rmDispatcher = new AsyncDispatcher();
     CapacityScheduler sched = spy(new CapacityScheduler());
     YarnConfiguration conf = new YarnConfiguration();
-    SchedulerEventDispatcher schedulerDispatcher =
-        new SchedulerEventDispatcher(sched);
+    EventDispatcher schedulerDispatcher =
+        new EventDispatcher(sched, sched.getClass().getName());
     rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
     rmDispatcher.init(conf);
     rmDispatcher.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e0fd9ab..119ae09 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -987,7 +988,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
             scheduler.handle(event);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f4cb3b3..458f94d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
       }
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 5035afe..e5ba470 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -423,10 +424,11 @@ public class TestAMRestart {
     MockRM rm1 = new MockRM(conf, memStore) {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
new file mode 100644
index 0000000..35ab6a9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
+    .AMLivelinessMonitor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+    .DistributedSchedulingService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+public class TestDistributedSchedulingService {
+
+  // Test if the DistributedSchedulingService can handle both DSProtocol as
+  // well as AMProtocol clients
+  @Test
+  public void testRPCWrapping() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+    final RMContext rmContext = new RMContextImpl() {
+      @Override
+      public AMLivelinessMonitor getAMLivelinessMonitor() {
+        return null;
+      }
+
+      @Override
+      public Configuration getYarnConfiguration() {
+        return new YarnConfiguration();
+      }
+    };
+    DistributedSchedulingService service =
+        new DistributedSchedulingService(rmContext, null) {
+          @Override
+          public RegisterApplicationMasterResponse registerApplicationMaster
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+                RegisterApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setQueue("dummyQueue");
+            return resp;
+          }
+
+          @Override
+          public FinishApplicationMasterResponse finishApplicationMaster
+              (FinishApplicationMasterRequest request) throws YarnException,
+              IOException {
+            FinishApplicationMasterResponse resp = factory.newRecordInstance(
+                FinishApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setIsUnregistered(false);
+            return resp;
+          }
+
+          @Override
+          public AllocateResponse allocate(AllocateRequest request) throws
+              YarnException, IOException {
+            AllocateResponse response = factory.newRecordInstance
+                (AllocateResponse.class);
+            response.setNumClusterNodes(12345);
+            return response;
+          }
+
+          @Override
+          public DistSchedRegisterResponse
+          registerApplicationMasterForDistributedScheduling
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            DistSchedRegisterResponse resp = factory.newRecordInstance(
+                DistSchedRegisterResponse.class);
+            resp.setContainerIdStart(54321l);
+            return resp;
+          }
+
+          @Override
+          public DistSchedAllocateResponse allocateForDistributedScheduling
+              (AllocateRequest request) throws YarnException, IOException {
+            DistSchedAllocateResponse resp =
+                factory.newRecordInstance(DistSchedAllocateResponse.class);
+            resp.setNodesForScheduling(
+                Arrays.asList(NodeId.newInstance("h1", 1234)));
+            return resp;
+          }
+        };
+    Server server = service.getServer(rpc, conf, addr, null);
+    server.start();
+
+    // Verify that the DistrubutedSchedulingService can handle vanilla
+    // ApplicationMasterProtocol clients
+    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ApplicationMasterProtocol ampProxy =
+        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp = 
ampProxy.registerApplicationMaster(
+            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals("dummyQueue", regResp.getQueue());
+    FinishApplicationMasterResponse finishResp = ampProxy
+        .finishApplicationMaster(factory.newRecordInstance(
+            FinishApplicationMasterRequest.class));
+    Assert.assertEquals(false, finishResp.getIsUnregistered());
+    AllocateResponse allocResp = ampProxy
+        .allocate(factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+    // Verify that the DistrubutedSchedulingService can handle the
+    // DistributedSchedulerProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    DistributedSchedulerProtocol dsProxy =
+        (DistributedSchedulerProtocol) 
rpc.getProxy(DistributedSchedulerProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+
+    DistSchedRegisterResponse dsRegResp =
+        dsProxy.registerApplicationMasterForDistributedScheduling(
+        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+    DistSchedAllocateResponse dsAllocResp =
+        dsProxy.allocateForDistributedScheduling(
+            factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(
+        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
new file mode 100644
index 0000000..a21ae19
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TestTopKNodeSelector {
+
+  static class FakeNodeId extends NodeId {
+    final String host;
+    final int port;
+
+    public FakeNodeId(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+
+    @Override
+    protected void setHost(String host) {}
+    @Override
+    protected void setPort(int port) {}
+    @Override
+    protected void build() {}
+
+    @Override
+    public String toString() {
+      return host + ":" + port;
+    }
+  }
+
+  @Test
+  public void testQueueTimeSort() {
+    TopKNodeSelector selector = new TopKNodeSelector(5,
+        TopKNodeSelector.TopKComparator.WAIT_TIME);
+    selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
+    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+    selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time
+    selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+  }
+
+  @Test
+  public void testSelectionHints() {
+    TopKNodeSelector selector = new TopKNodeSelector(3,
+        TopKNodeSelector.TopKComparator.WAIT_TIME);
+    selector.nodeUpdate(createRMNode("h1", 1, 100, 10));
+    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+    selector.nodeUpdate(createRMNode("h3", 3, 95, 10));
+    selector.nodeUpdate(createRMNode("h4", 4, 10, 10));
+    selector.nodeUpdate(createRMNode("h5", 5, 90, 10));
+    selector.nodeUpdate(createRMNode("h6", 6, 15, 10));
+    selector.computeTask.run();
+
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(2).toString());
+
+    nodeIds = selector.selectNodes(
+        Arrays.asList(
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h1", 1)
+            ), 1),
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h3", 3)
+            ), 1)
+        ));
+    System.out.println("2-> " + nodeIds);
+    // Enure the hinted nodes are on the top
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+
+    nodeIds = selector.selectNodes(
+        Arrays.asList(
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h1", 1),
+                new FakeNodeId("h3", 3),
+                new FakeNodeId("h5", 5)
+            ), 2)
+        ));
+    System.out.println("3-> " + nodeIds);
+    // Enure the hinted nodes are on the top
+    Assert.assertEquals("h5:5", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+  }
+
+  @Test
+  public void testQueueLengthSort() {
+    TopKNodeSelector selector = new TopKNodeSelector(5,
+        TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
+    selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
+    selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
+    selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time but valid length
+    selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength) {
+    RMNode node1 = Mockito.mock(RMNode.class);
+    NodeId nID1 = new FakeNodeId(host, port);
+    Mockito.when(node1.getNodeID()).thenReturn(nID1);
+    QueuedContainersStatus status1 =
+        Mockito.mock(QueuedContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime())
+        .thenReturn(waitTime);
+    Mockito.when(status1.getWaitQueueLength())
+        .thenReturn(queueLength);
+    Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
+    return node1;
+  }
+}

Reply via email to