Repository: hadoop Updated Branches: refs/heads/yarn-2877 b00875e60 -> 2340511f7
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java new file mode 100644 index 0000000..5aedbed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TopKNodeSelector implements ClusterMonitor, NodeSelector { + + final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); + + enum TopKComparator implements Comparator<ClusterNode> { + WAIT_TIME, + QUEUE_LENGTH; + + @Override + public int compare(ClusterNode o1, ClusterNode o2) { + if (getQuant(o1) == getQuant(o2)) { + return o1.timestamp < o2.timestamp ? +1 : -1; + } + return getQuant(o1) > getQuant(o2) ? +1 : -1; + } + + private int getQuant(ClusterNode c) { + return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; + } + } + + static class ClusterNode { + int queueTime = -1; + int waitQueueLength = 0; + double timestamp; + final NodeId nodeId; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + updateTimestamp(); + } + + public ClusterNode setQueueTime(int queueTime) { + this.queueTime = queueTime; + return this; + } + + public ClusterNode setWaitQueueLength(int queueLength) { + this.waitQueueLength = queueLength; + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + } + + private final int k; + private final List<NodeId> topKNodes; + private final ScheduledExecutorService scheduledExecutor; + private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>(); + private final Comparator<ClusterNode> comparator; + + Runnable computeTask = new Runnable() { + @Override + public void run() { + synchronized (topKNodes) { + topKNodes.clear(); + topKNodes.addAll(computeTopKNodes()); + } + } + }; + + @VisibleForTesting + TopKNodeSelector(int k, TopKComparator comparator) { + this.k = k; + this.topKNodes = new ArrayList<>(); + this.comparator = comparator; + this.scheduledExecutor = null; + } + + public TopKNodeSelector(int k, long nodeComputationInterval, + TopKComparator comparator) { + this.k = k; + this.topKNodes = new ArrayList<>(); + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.comparator = comparator; + this.scheduledExecutor.scheduleAtFixedRate(computeTask, + nodeComputationInterval, nodeComputationInterval, + TimeUnit.MILLISECONDS); + } + + + @Override + public void addNode(List<NMContainerStatus> containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { + this.clusterNodes.remove(removedRMNode.getNodeID()); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + } else { + LOG.debug("Node not in list!"); + } + } + } + + @Override + public void nodeUpdate(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNodeID()); + QueuedContainersStatus queuedContainersStatus = + rmNode.getQueuedContainersStatus(); + int estimatedQueueWaitTime = + queuedContainersStatus.getEstimatedQueueWaitTime(); + int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node + // UNLESS comparator is based on queue length, in which case, we should add + synchronized (this.clusterNodes) { + ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); + if (currentNode == null) { + if (estimatedQueueWaitTime != -1 + || comparator == TopKComparator.QUEUE_LENGTH) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setQueueTime(estimatedQueueWaitTime) + .setWaitQueueLength(waitQueueLength)); + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } + } else { + if (estimatedQueueWaitTime != -1 + || comparator == TopKComparator.QUEUE_LENGTH) { + currentNode + .setQueueTime(estimatedQueueWaitTime) + .setWaitQueueLength(waitQueueLength) + .updateTimestamp(); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + currentNode.queueTime + "] and " + + "wait queue length [" + currentNode.waitQueueLength + "]"); + } + } + } + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently... + } + + @Override + public List<NodeId> selectNodes() { + synchronized (this.topKNodes) { + return this.k < this.topKNodes.size() ? + new ArrayList<>(this.topKNodes).subList(0, this.k) : + new ArrayList<>(this.topKNodes); + } + } + + @Override + public List<NodeId> selectNodes(Collection<SelectionHint> hints) { + List<NodeId> retList = selectNodes(); + Set<NodeId> alreadyAdded = new HashSet<>(retList); + TreeSet<ClusterNode> toAdd = new TreeSet<>(this.comparator); + synchronized (this.clusterNodes) { + for (SelectionHint hint : hints) { + // Sort the nodes in the criteria (We need the best nodes) + PriorityQueue<ClusterNode> temp = + new PriorityQueue<>(hint.getNodeIds().length, this.comparator); + for (NodeId n : hint.getNodeIds()) { + if (!alreadyAdded.contains(n) && + !toAdd.contains(clusterNodes.get(n))) { + temp.add(clusterNodes.get(n)); + } + } + // From the Sorted list, select the 'minToInclude' best nodes + int numIncluded = 0; + while (!temp.isEmpty()) { + if (numIncluded < hint.getMinToInclude()) { + ClusterNode cn = temp.remove(); + toAdd.add(cn); + alreadyAdded.add(cn.nodeId); + numIncluded++; + } else { + break; + } + } + } + } + + if (toAdd.size() > 0) { + ArrayList<NodeId> newList = new ArrayList<>(); + for (ClusterNode cn : toAdd) { + newList.add(cn.nodeId); + } + newList.addAll(retList); + retList = newList; + } + return retList; + } + + private List<NodeId> computeTopKNodes() { + synchronized (this.clusterNodes) { + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List<NodeId> retList = new ArrayList<>(); + Object[] nodes = aList.toArray(); + // Collections.sort would do something similar by calling Arrays.sort + // internally but would finally iterate through the input list (aList) + // to reset the value of each element.. Since we don't really care about + // 'aList', we can use the iteration to create the list of nodeIds which + // is what we ultimately care about. + Arrays.sort(nodes, (Comparator)comparator); + for (int j=0; j < nodes.length; j++) { + retList.add(((ClusterNode)nodes[j]).nodeId); + } + return retList; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..f5b61a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -260,6 +260,10 @@ public class MockNodes { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + public QueuedContainersStatus getQueuedContainersStatus() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 6182b07..9b4e1c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .DistributedSchedulingService; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 3fa377e..c45fba8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -167,10 +169,11 @@ public class TestApplicationCleanup { MockRM rm = new MockRM() { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java deleted file mode 100644 index 262fd5a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt - .AMLivelinessMonitor; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; - -public class TestDistributedSchedulingService { - - // Test if the DistributedSchedulingService can handle both DSProtocol as - // well as AMProtocol clients - @Test - public void testRPCWrapping() throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class - .getName()); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr); - final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); - final RMContext rmContext = new RMContextImpl() { - @Override - public AMLivelinessMonitor getAMLivelinessMonitor() { - return null; - } - }; - DistributedSchedulingService service = - new DistributedSchedulingService(rmContext, null) { - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws - YarnException, IOException { - RegisterApplicationMasterResponse resp = factory.newRecordInstance( - RegisterApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setQueue("dummyQueue"); - return resp; - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - FinishApplicationMasterResponse resp = factory.newRecordInstance( - FinishApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setIsUnregistered(false); - return resp; - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - AllocateResponse response = factory.newRecordInstance - (AllocateResponse.class); - response.setNumClusterNodes(12345); - return response; - } - - @Override - public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws - YarnException, IOException { - DistSchedRegisterResponse resp = factory.newRecordInstance( - DistSchedRegisterResponse.class); - resp.setContainerIdStart(54321l); - return resp; - } - - @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { - DistSchedAllocateResponse resp = - factory.newRecordInstance(DistSchedAllocateResponse.class); - resp.setNodesForScheduling( - Arrays.asList(NodeId.newInstance("h1", 1234))); - return resp; - } - }; - Server server = service.getServer(rpc, conf, addr, null); - server.start(); - - // Verify that the DistrubutedSchedulingService can handle vanilla - // ApplicationMasterProtocol clients - RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, - ProtobufRpcEngine.class); - ApplicationMasterProtocol ampProxy = - (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol - .class, NetUtils.getConnectAddress(server), conf); - RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); - Assert.assertEquals("dummyQueue", regResp.getQueue()); - FinishApplicationMasterResponse finishResp = ampProxy - .finishApplicationMaster(factory.newRecordInstance( - FinishApplicationMasterRequest.class)); - Assert.assertEquals(false, finishResp.getIsUnregistered()); - AllocateResponse allocResp = ampProxy - .allocate(factory.newRecordInstance(AllocateRequest.class)); - Assert.assertEquals(12345, allocResp.getNumClusterNodes()); - - - // Verify that the DistrubutedSchedulingService can handle the - // DistributedSchedulerProtocol clients as well - RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class, - ProtobufRpcEngine.class); - DistributedSchedulerProtocol dsProxy = - (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol - .class, NetUtils.getConnectAddress(server), conf); - - DistSchedRegisterResponse dsRegResp = - dsProxy.registerApplicationMasterForDistributedScheduling( - factory.newRecordInstance(RegisterApplicationMasterRequest.class)); - Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); - DistSchedAllocateResponse dsAllocResp = - dsProxy.allocateForDistributedScheduling( - factory.newRecordInstance(AllocateRequest.class)); - Assert.assertEquals( - "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index d9306dd..429817e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -44,8 +44,8 @@ public class TestRMDispatcher { AsyncDispatcher rmDispatcher = new AsyncDispatcher(); CapacityScheduler sched = spy(new CapacityScheduler()); YarnConfiguration conf = new YarnConfiguration(); - SchedulerEventDispatcher schedulerDispatcher = - new SchedulerEventDispatcher(sched); + EventDispatcher schedulerDispatcher = + new EventDispatcher(sched, sched.getClass().getName()); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); rmDispatcher.init(conf); rmDispatcher.start(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index e0fd9ab..119ae09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -987,7 +988,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm = new MockRM() { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { scheduler.handle(event); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index f4cb3b3..458f94d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.security.PrivilegedExceptionAction; import java.util.List; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates { } @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 5035afe..e5ba470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -423,10 +424,11 @@ public class TestAMRestart { MockRM rm1 = new MockRM(conf, memStore) { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java new file mode 100644 index 0000000..35ab6a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; +import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .AMLivelinessMonitor; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .DistributedSchedulingService; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +public class TestDistributedSchedulingService { + + // Test if the DistributedSchedulingService can handle both DSProtocol as + // well as AMProtocol clients + @Test + public void testRPCWrapping() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr); + final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); + final RMContext rmContext = new RMContextImpl() { + @Override + public AMLivelinessMonitor getAMLivelinessMonitor() { + return null; + } + + @Override + public Configuration getYarnConfiguration() { + return new YarnConfiguration(); + } + }; + DistributedSchedulingService service = + new DistributedSchedulingService(rmContext, null) { + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws + YarnException, IOException { + RegisterApplicationMasterResponse resp = factory.newRecordInstance( + RegisterApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setQueue("dummyQueue"); + return resp; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + FinishApplicationMasterResponse resp = factory.newRecordInstance( + FinishApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setIsUnregistered(false); + return resp; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + AllocateResponse response = factory.newRecordInstance + (AllocateResponse.class); + response.setNumClusterNodes(12345); + return response; + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling + (RegisterApplicationMasterRequest request) throws + YarnException, IOException { + DistSchedRegisterResponse resp = factory.newRecordInstance( + DistSchedRegisterResponse.class); + resp.setContainerIdStart(54321l); + return resp; + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling + (AllocateRequest request) throws YarnException, IOException { + DistSchedAllocateResponse resp = + factory.newRecordInstance(DistSchedAllocateResponse.class); + resp.setNodesForScheduling( + Arrays.asList(NodeId.newInstance("h1", 1234))); + return resp; + } + }; + Server server = service.getServer(rpc, conf, addr, null); + server.start(); + + // Verify that the DistrubutedSchedulingService can handle vanilla + // ApplicationMasterProtocol clients + RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, + ProtobufRpcEngine.class); + ApplicationMasterProtocol ampProxy = + (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol + .class, NetUtils.getConnectAddress(server), conf); + RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster( + factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + Assert.assertEquals("dummyQueue", regResp.getQueue()); + FinishApplicationMasterResponse finishResp = ampProxy + .finishApplicationMaster(factory.newRecordInstance( + FinishApplicationMasterRequest.class)); + Assert.assertEquals(false, finishResp.getIsUnregistered()); + AllocateResponse allocResp = ampProxy + .allocate(factory.newRecordInstance(AllocateRequest.class)); + Assert.assertEquals(12345, allocResp.getNumClusterNodes()); + + + // Verify that the DistrubutedSchedulingService can handle the + // DistributedSchedulerProtocol clients as well + RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class, + ProtobufRpcEngine.class); + DistributedSchedulerProtocol dsProxy = + (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol + .class, NetUtils.getConnectAddress(server), conf); + + DistSchedRegisterResponse dsRegResp = + dsProxy.registerApplicationMasterForDistributedScheduling( + factory.newRecordInstance(RegisterApplicationMasterRequest.class)); + Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); + DistSchedAllocateResponse dsAllocResp = + dsProxy.allocateForDistributedScheduling( + factory.newRecordInstance(AllocateRequest.class)); + Assert.assertEquals( + "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java new file mode 100644 index 0000000..a21ae19 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.List; + +public class TestTopKNodeSelector { + + static class FakeNodeId extends NodeId { + final String host; + final int port; + + public FakeNodeId(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + protected void setHost(String host) {} + @Override + protected void setPort(int port) {} + @Override + protected void build() {} + + @Override + public String toString() { + return host + ":" + port; + } + } + + @Test + public void testQueueTimeSort() { + TopKNodeSelector selector = new TopKNodeSelector(5, + TopKNodeSelector.TopKComparator.WAIT_TIME); + selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); + selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); + selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); + selector.computeTask.run(); + List<NodeId> nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.nodeUpdate(createRMNode("h3", 3, 2, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time + selector.nodeUpdate(createRMNode("h4", 4, -1, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + } + + @Test + public void testSelectionHints() { + TopKNodeSelector selector = new TopKNodeSelector(3, + TopKNodeSelector.TopKComparator.WAIT_TIME); + selector.nodeUpdate(createRMNode("h1", 1, 100, 10)); + selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); + selector.nodeUpdate(createRMNode("h3", 3, 95, 10)); + selector.nodeUpdate(createRMNode("h4", 4, 10, 10)); + selector.nodeUpdate(createRMNode("h5", 5, 90, 10)); + selector.nodeUpdate(createRMNode("h6", 6, 15, 10)); + selector.computeTask.run(); + + List<NodeId> nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h4:4", nodeIds.get(1).toString()); + Assert.assertEquals("h6:6", nodeIds.get(2).toString()); + + nodeIds = selector.selectNodes( + Arrays.asList( + new NodeSelector.SelectionHint(Arrays.asList( + (NodeId) new FakeNodeId("h1", 1) + ), 1), + new NodeSelector.SelectionHint(Arrays.asList( + (NodeId) new FakeNodeId("h3", 3) + ), 1) + )); + System.out.println("2-> " + nodeIds); + // Enure the hinted nodes are on the top + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h2:2", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + Assert.assertEquals("h6:6", nodeIds.get(4).toString()); + + nodeIds = selector.selectNodes( + Arrays.asList( + new NodeSelector.SelectionHint(Arrays.asList( + (NodeId) new FakeNodeId("h1", 1), + new FakeNodeId("h3", 3), + new FakeNodeId("h5", 5) + ), 2) + )); + System.out.println("3-> " + nodeIds); + // Enure the hinted nodes are on the top + Assert.assertEquals("h5:5", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h2:2", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + Assert.assertEquals("h6:6", nodeIds.get(4).toString()); + } + + @Test + public void testQueueLengthSort() { + TopKNodeSelector selector = new TopKNodeSelector(5, + TopKNodeSelector.TopKComparator.QUEUE_LENGTH); + selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); + selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); + selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + selector.computeTask.run(); + List<NodeId> nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.nodeUpdate(createRMNode("h3", 3, -1, 2)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time but valid length + selector.nodeUpdate(createRMNode("h4", 4, -1, 20)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength) { + RMNode node1 = Mockito.mock(RMNode.class); + NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getNodeID()).thenReturn(nID1); + QueuedContainersStatus status1 = + Mockito.mock(QueuedContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()) + .thenReturn(waitTime); + Mockito.when(status1.getWaitQueueLength()) + .thenReturn(queueLength); + Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1); + return node1; + } +}
