Repository: hadoop Updated Branches: refs/heads/trunk 8f0d3d69d -> 8fbe6ece2
YARN-5350. Distributed Scheduling: Ensure sort order of allocatable nodes returned by the RM is not lost. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8fbe6ece Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8fbe6ece Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8fbe6ece Branch: refs/heads/trunk Commit: 8fbe6ece24e38ee24fee0abdbed5f7dc5d3c16da Parents: 8f0d3d6 Author: Arun Suresh <[email protected]> Authored: Tue Jul 19 23:03:58 2016 -0700 Committer: Arun Suresh <[email protected]> Committed: Tue Jul 19 23:03:58 2016 -0700 ---------------------------------------------------------------------- .../nodemanager/scheduler/LocalScheduler.java | 3 +- .../scheduler/TestLocalScheduler.java | 209 ++++++++++++------- 2 files changed, 130 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fbe6ece/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index 10c1361..ec0e8a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -112,7 +113,7 @@ public final class LocalScheduler extends AbstractRequestInterceptor { private DistSchedulerParams appParams = new DistSchedulerParams(); private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter = new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map<String, NodeId> nodeList = new HashMap<>(); + private Map<String, NodeId> nodeList = new LinkedHashMap<>(); // Mapping of NodeId to NodeTokens. Populated either from RM response or // generated locally if required. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fbe6ece/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index 31f8085..8de849b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -61,6 +61,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public class TestLocalScheduler { @@ -70,61 +71,12 @@ public class TestLocalScheduler { Configuration conf = new Configuration(); LocalScheduler localScheduler = new LocalScheduler(); - NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); - Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); - NMContainerTokenSecretManager nmContainerTokenSecretManager = new - NMContainerTokenSecretManager(conf); - MasterKey mKey = new MasterKey() { - @Override - public int getKeyId() { - return 1; - } - @Override - public void setKeyId(int keyId) {} - @Override - public ByteBuffer getBytes() { - return ByteBuffer.allocate(8); - } - @Override - public void setBytes(ByteBuffer bytes) {} - }; - nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); - OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); - - NMTokenSecretManagerInNM nmTokenSecretManagerInNM = - new NMTokenSecretManagerInNM(); - nmTokenSecretManagerInNM.setMasterKey(mKey); - localScheduler.initLocal( - ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), - containerAllocator, nmTokenSecretManagerInNM, "test"); - - RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); - localScheduler.setNextInterceptor(finalReqIntcptr); + RequestInterceptor finalReqIntcptr = setup(conf, localScheduler); - DistSchedRegisterResponse distSchedRegisterResponse = - Records.newRecord(DistSchedRegisterResponse.class); - distSchedRegisterResponse.setRegisterResponse( - Records.newRecord(RegisterApplicationMasterResponse.class)); - distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); - distSchedRegisterResponse.setContainerIdStart(0); - distSchedRegisterResponse.setMaxAllocatableCapabilty( - Resource.newInstance(1024, 4)); - distSchedRegisterResponse.setMinAllocatableCapabilty( - Resource.newInstance(512, 2)); - distSchedRegisterResponse.setNodesForScheduling(Arrays.asList( + registerAM(localScheduler, finalReqIntcptr, Arrays.asList( NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); - Mockito.when( - finalReqIntcptr.registerApplicationMasterForDistributedScheduling( - Mockito.any(RegisterApplicationMasterRequest.class))) - .thenReturn(distSchedRegisterResponse); - - localScheduler.registerApplicationMaster( - Records.newRecord(RegisterApplicationMasterRequest.class)); + final AtomicBoolean flipFlag = new AtomicBoolean(false); Mockito.when( finalReqIntcptr.allocateForDistributedScheduling( Mockito.any(DistSchedAllocateRequest.class))) @@ -132,36 +84,33 @@ public class TestLocalScheduler { @Override public DistSchedAllocateResponse answer(InvocationOnMock invocationOnMock) throws Throwable { - return createAllocateResponse(Arrays.asList( - NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + flipFlag.set(!flipFlag.get()); + if (flipFlag.get()) { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + } else { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("d", 4), NodeId.newInstance("c", 3))); + } } }); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class); - guaranteedReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); - guaranteedReq.setNumContainers(5); - guaranteedReq.setCapability(Resource.newInstance(2048, 2)); - guaranteedReq.setRelaxLocality(true); - guaranteedReq.setResourceName("*"); - ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); - opportunisticReq.setNumContainers(4); - opportunisticReq.setCapability(Resource.newInstance(1024, 4)); - opportunisticReq.setPriority(Priority.newInstance(100)); - opportunisticReq.setRelaxLocality(true); - opportunisticReq.setResourceName("*"); + ResourceRequest guaranteedReq = + createResourceRequest(ExecutionType.GUARANTEED, 5, "*"); + + ResourceRequest opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*"); allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); // Verify 4 containers were allocated - AllocateResponse allocateResponse = localScheduler.allocate(allocateRequest); + AllocateResponse allocateResponse = + localScheduler.allocate(allocateRequest); Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); // Verify equal distribution on hosts a and b // And None on c and d - Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse); + Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4); Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); @@ -169,14 +118,8 @@ public class TestLocalScheduler { // New Allocate request allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); - opportunisticReq.setNumContainers(6); - opportunisticReq.setCapability(Resource.newInstance(512, 3)); - opportunisticReq.setPriority(Priority.newInstance(100)); - opportunisticReq.setRelaxLocality(true); - opportunisticReq.setResourceName("*"); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); // Verify 6 containers were allocated @@ -185,11 +128,113 @@ public class TestLocalScheduler { // Verify New containers are equally distribution on hosts c and d // And None on a and b - allocs = mapAllocs(allocateResponse); + allocs = mapAllocs(allocateResponse, 6); Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); + + // Ensure the LocalScheduler respects the list order.. + // The first request should be allocated to "d" since it is ranked higher + // The second request should be allocated to "c" since the ranking is + // flipped on every allocate response. + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = localScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + } + + private void registerAM(LocalScheduler localScheduler, RequestInterceptor + finalReqIntcptr, List<NodeId> nodeList) throws Exception { + DistSchedRegisterResponse distSchedRegisterResponse = + Records.newRecord(DistSchedRegisterResponse.class); + distSchedRegisterResponse.setRegisterResponse( + Records.newRecord(RegisterApplicationMasterResponse.class)); + distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); + distSchedRegisterResponse.setContainerIdStart(0); + distSchedRegisterResponse.setMaxAllocatableCapabilty( + Resource.newInstance(1024, 4)); + distSchedRegisterResponse.setMinAllocatableCapabilty( + Resource.newInstance(512, 2)); + distSchedRegisterResponse.setNodesForScheduling(nodeList); + Mockito.when( + finalReqIntcptr.registerApplicationMasterForDistributedScheduling( + Mockito.any(RegisterApplicationMasterRequest.class))) + .thenReturn(distSchedRegisterResponse); + + localScheduler.registerApplicationMaster( + Records.newRecord(RegisterApplicationMasterRequest.class)); + } + + private RequestInterceptor setup(Configuration conf, LocalScheduler + localScheduler) { + NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); + Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); + Context context = Mockito.mock(Context.class); + NMContainerTokenSecretManager nmContainerTokenSecretManager = new + NMContainerTokenSecretManager(conf); + MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + nmContainerTokenSecretManager.setMasterKey(mKey); + Mockito.when(context.getContainerTokenSecretManager()).thenReturn + (nmContainerTokenSecretManager); + OpportunisticContainerAllocator containerAllocator = + new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + + NMTokenSecretManagerInNM nmTokenSecretManagerInNM = + new NMTokenSecretManagerInNM(); + nmTokenSecretManagerInNM.setMasterKey(mKey); + localScheduler.initLocal( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), + containerAllocator, nmTokenSecretManagerInNM, "test"); + + RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); + localScheduler.setNextInterceptor(finalReqIntcptr); + return finalReqIntcptr; + } + + private ResourceRequest createResourceRequest(ExecutionType execType, + int numContainers, String resourceName) { + ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); + opportunisticReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(execType, true)); + opportunisticReq.setNumContainers(numContainers); + opportunisticReq.setCapability(Resource.newInstance(1024, 4)); + opportunisticReq.setPriority(Priority.newInstance(100)); + opportunisticReq.setRelaxLocality(true); + opportunisticReq.setResourceName(resourceName); + return opportunisticReq; } private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) { @@ -202,7 +247,9 @@ public class TestLocalScheduler { } private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse - allocateResponse) throws Exception { + allocateResponse, int expectedSize) throws Exception { + Assert.assertEquals(expectedSize, + allocateResponse.getAllocatedContainers().size()); Map<NodeId, List<ContainerId>> allocs = new HashMap<>(); for (Container c : allocateResponse.getAllocatedContainers()) { ContainerTokenIdentifier cTokId = BuilderUtils --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
