http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java new file mode 100644 index 0000000..94cb28e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java @@ -0,0 +1,784 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Class that tests the allocation of OPPORTUNISTIC containers through the + * centralized ResourceManager. + */ +public class TestOpportunisticContainerAllocationE2E { + private static Configuration conf = null; + private static MiniYARNCluster yarnCluster = null; + private static YarnClient yarnClient = null; + private static List<NodeReport> nodeReports = null; + private static int nodeCount = 3; + + private static final int ROLLING_INTERVAL_SEC = 13; + private static final long AM_EXPIRE_MS = 4000; + + private static Resource capability; + private static ProfileCapability profileCapability; + private static Priority priority; + private static Priority priority2; + private static Priority priority3; + private static Priority priority4; + private static String node; + private static String rack; + private static String[] nodes; + private static String[] racks; + private final static int DEFAULT_ITERATION = 3; + + // Per test.. + private ApplicationAttemptId attemptId = null; + private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null; + private long availMB; + private int availVCores; + private long allocMB; + private int allocVCores; + + @BeforeClass + public static void setup() throws Exception { + // start minicluster + conf = new YarnConfiguration(); + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + ROLLING_INTERVAL_SEC); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000); + // set the minimum allocation so that resource decrease can go under 1024 + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + yarnCluster = + new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + // get node info + nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + + priority = Priority.newInstance(1); + priority2 = Priority.newInstance(2); + priority3 = Priority.newInstance(3); + priority4 = Priority.newInstance(4); + capability = Resource.newInstance(512, 1); + profileCapability = ProfileCapability.newInstance(capability); + + node = nodeReports.get(0).getNodeId().getHost(); + rack = nodeReports.get(0).getRackName(); + nodes = new String[]{node}; + racks = new String[]{rack}; + } + + @Before + public void startApp() throws Exception { + // submit new app + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( + Collections.<String, LocalResource>emptyMap(), + new HashMap<String, String>(), Arrays.asList("sleep", "100"), + new HashMap<String, ByteBuffer>(), null, + new HashMap<ApplicationAccessType, String>()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = + Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + RMAppAttempt appAttempt = null; + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == + YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + while (true) { + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + break; + } + } + break; + } + } + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken() + .setService(ClientRMProxy.getAMRMTokenService(conf)); + + // start am rm client + amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient + .createAMRMClient(); + + //setting an instance NMTokenCache + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); + } + + @After + public void cancelApp() throws YarnException, IOException { + try { + amClient + .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, + null); + } finally { + if (amClient != null && + amClient.getServiceState() == Service.STATE.STARTED) { + amClient.stop(); + } + } + yarnClient.killApplication(attemptId.getApplicationId()); + attemptId = null; + } + + @AfterClass + public static void tearDown() { + if (yarnClient != null && + yarnClient.getServiceState() == Service.STATE.STARTED) { + yarnClient.stop(); + } + if (yarnCluster != null && + yarnCluster.getServiceState() == Service.STATE.STARTED) { + yarnCluster.stop(); + } + } + + @Test(timeout = 60000) + public void testPromotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + .getNumContainers(); + + assertEquals(1, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + updateMetrics("Before Opp Allocation"); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + allocatedOpportContainers.put(container.getId(), container); + removeCR(container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); + + updateMetrics("After Opp Allocation / Before Promotion"); + + try { + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + Assert.fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "target should be GUARANTEED and original should be OPPORTUNISTIC")); + } + + Container c = allocatedOpportContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + iterationsLeft = 120; + Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + + updateMetrics("After Promotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedOpportContainers.keySet()) { + Container orig = allocatedOpportContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.GUARANTEED, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); + } + + @Test(timeout = 60000) + public void testDemotionFromAcquired() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority3)); + + int guarContainersRequestedAny = amClient.getTable(0).get(priority3, + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + .remoteRequest.getNumContainers(); + + assertEquals(1, guarContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>(); + int iterationsLeft = 50; + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + updateMetrics("Before Guar Allocation"); + + while (allocatedContainerCount < guarContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + allocatedGuarContainers.put(container.getId(), container); + removeCR(container); + } + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < guarContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(guarContainersRequestedAny, allocatedContainerCount); + assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size()); + + updateMetrics("After Guar Allocation / Before Demotion"); + + try { + Container c = allocatedGuarContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + Assert.fail("Should throw Exception.."); + } catch (IllegalArgumentException e) { + System.out.println("## " + e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "target should be OPPORTUNISTIC and original should be GUARANTEED")); + } + + Container c = allocatedGuarContainers.values().iterator().next(); + amClient.requestContainerUpdate( + c, UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC)); + iterationsLeft = 120; + Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + if (allocResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocResponse + .getUpdatedContainers()) { + System.out.println("Got update.."); + updatedContainers.put(updatedContainer.getContainer().getId(), + updatedContainer); + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + + updateMetrics("After Demotion"); + + assertEquals(1, updatedContainers.size()); + for (ContainerId cId : allocatedGuarContainers.keySet()) { + Container orig = allocatedGuarContainers.get(cId); + UpdatedContainer updatedContainer = updatedContainers.get(cId); + assertNotNull(updatedContainer); + assertEquals(ExecutionType.OPPORTUNISTIC, + updatedContainer.getContainer().getExecutionType()); + assertEquals(orig.getResource(), + updatedContainer.getContainer().getResource()); + assertEquals(orig.getNodeId(), + updatedContainer.getContainer().getNodeId()); + assertEquals(orig.getVersion() + 1, + updatedContainer.getContainer().getVersion()); + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + amClient.ask.clear(); + } + + @Test(timeout = 60000) + public void testMixedAllocationAndRelease() throws YarnException, + IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int containersRequestedNode = amClient.getTable(0).get(priority, + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest + .getNumContainers(); + int containersRequestedRack = amClient.getTable(0).get(priority, + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest + .getNumContainers(); + int containersRequestedAny = amClient.getTable(0).get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + .remoteRequest.getNumContainers(); + int oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + .getNumContainers(); + + assertEquals(4, containersRequestedNode); + assertEquals(4, containersRequestedRack); + assertEquals(4, containersRequestedAny); + assertEquals(2, oppContainersRequestedAny); + + assertEquals(4, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + containersRequestedNode = amClient.getTable(0).get(priority, + node, ExecutionType.GUARANTEED, profileCapability).remoteRequest + .getNumContainers(); + containersRequestedRack = amClient.getTable(0).get(priority, + rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest + .getNumContainers(); + containersRequestedAny = amClient.getTable(0).get(priority, + ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + .remoteRequest.getNumContainers(); + oppContainersRequestedAny = + amClient.getTable(0).get(priority2, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + .getNumContainers(); + + assertEquals(2, containersRequestedNode); + assertEquals(2, containersRequestedRack); + assertEquals(2, containersRequestedAny); + assertEquals(1, oppContainersRequestedAny); + + assertEquals(4, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int allocatedOpportContainerCount = 0; + int iterationsLeft = 50; + Set<ContainerId> releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < + containersRequestedAny + oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + allocatedContainerCount += + allocResponse.getAllocatedContainers().size(); + for (Container container : allocResponse.getAllocatedContainers()) { + if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + allocatedOpportContainerCount++; + } + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < containersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(containersRequestedAny + oppContainersRequestedAny, + allocatedContainerCount); + assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount); + for (ContainerId rejectContainerId : releases) { + amClient.releaseAssignedContainer(rejectContainerId); + } + assertEquals(3, amClient.release.size()); + assertEquals(0, amClient.ask.size()); + + // need to tell the AMRMClient that we don't need these resources anymore + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + assertEquals(4, amClient.ask.size()); + + iterationsLeft = 3; + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + assertEquals(0, allocResponse.getAllocatedContainers().size()); + if (allocResponse.getCompletedContainersStatuses().size() > 0) { + for (ContainerStatus cStatus : allocResponse + .getCompletedContainersStatuses()) { + if (releases.contains(cStatus.getContainerId())) { + assertEquals(cStatus.getState(), ContainerState.COMPLETE); + assertEquals(-100, cStatus.getExitStatus()); + releases.remove(cStatus.getContainerId()); + } + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + } + + /** + * Tests allocation with requests comprising only opportunistic containers. + */ + @Test(timeout = 60000) + public void testOpportunisticAllocation() throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = amClient.getTable(0) + .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + profileCapability).remoteRequest.getNumContainers(); + + assertEquals(2, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 10; + Set<ContainerId> releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap<String, Token> receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + for (Container container : allocResponse.getAllocatedContainers()) { + allocatedContainerCount++; + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + assertEquals(1, receivedNMTokens.values().size()); + } + + private void removeCR(Container container) { + List<? extends Collection<AMRMClient.ContainerRequest>> + matchingRequests = amClient.getMatchingRequests(container + .getPriority(), + ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, + container.getResource()); + Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>(); + for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) { + for (AMRMClient.ContainerRequest cr : rc) { + toRemove.add(cr); + } + } + for (AMRMClient.ContainerRequest cr : toRemove) { + amClient.removeContainerRequest(cr); + } + } + + private void updateMetrics(String msg) { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler)yarnCluster.getResourceManager() + .getResourceScheduler(); + availMB = scheduler.getRootQueueMetrics().getAvailableMB(); + availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + allocMB = scheduler.getRootQueueMetrics().getAllocatedMB(); + allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + System.out.println("## METRICS (" + msg + ")==>"); + System.out.println(" : availMB=" + availMB + ", " + + "availVCores=" +availVCores + ", " + + "allocMB=" + allocMB + ", " + + "allocVCores=" + allocVCores + ", "); + System.out.println("<== ##"); + } + + private void sleep(int sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index e403a12..f621aa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -47,6 +47,24 @@ public abstract class RemoteNode implements Comparable<RemoteNode> { } /** + * Create new Instance. + * @param nodeId NodeId. + * @param httpAddress Http address. + * @param rackName Rack Name. + * @return RemoteNode instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress, + String rackName) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + remoteNode.setRackName(rackName); + return remoteNode; + } + + /** * Get {@link NodeId}. * @return NodeId. */ @@ -79,6 +97,22 @@ public abstract class RemoteNode implements Comparable<RemoteNode> { public abstract void setHttpAddress(String httpAddress); /** + * Get Rack Name. + * @return Rack Name. + */ + @Private + @Unstable + public abstract String getRackName(); + + /** + * Set Rack Name. + * @param rackName Rack Name. + */ + @Private + @Unstable + public abstract void setRackName(String rackName); + + /** * Use the underlying {@link NodeId} comparator. * @param other RemoteNode. * @return Comparison. @@ -92,6 +126,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> { public String toString() { return "RemoteNode{" + "nodeId=" + getNodeId() + ", " + + "rackName=" + getRackName() + ", " + "httpAddress=" + getHttpAddress() + "}"; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java index 3e4fd4a..c2492cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -118,6 +118,25 @@ public class RemoteNodePBImpl extends RemoteNode { } @Override + public String getRackName() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRackName()) { + return null; + } + return (p.getRackName()); + } + + @Override + public void setRackName(String rackName) { + maybeInitBuilder(); + if (rackName == null) { + builder.clearRackName(); + return; + } + builder.setRackName(rackName); + } + + @Override public int hashCode() { return getProto().hashCode(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 782dc02..ede4958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -45,11 +45,14 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -61,6 +64,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class OpportunisticContainerAllocator { + private static final int NODE_LOCAL_LOOP = 0; + private static final int RACK_LOCAL_LOOP = 1; + private static final int OFF_SWITCH_LOOP = 2; + /** * This class encapsulates application specific parameters used to build a * Container. @@ -70,6 +77,7 @@ public class OpportunisticContainerAllocator { private Resource minResource; private Resource incrementResource; private int containerTokenExpiryInterval; + private int maxAllocationsPerSchedulerKeyPerRound = 1; /** * Return Max Resource. @@ -135,6 +143,24 @@ public class OpportunisticContainerAllocator { int containerTokenExpiryInterval) { this.containerTokenExpiryInterval = containerTokenExpiryInterval; } + + /** + * Get the Max Allocations per Scheduler Key per allocation round. + * @return maxAllocationsPerSchedulerKeyPerRound. + */ + public int getMaxAllocationsPerSchedulerKeyPerRound() { + return maxAllocationsPerSchedulerKeyPerRound; + } + + /** + * Set the Max Allocations per Scheduler Key per allocation round. + * @param maxAllocationsPerSchedulerKeyPerRound val. + */ + public void setMaxAllocationsPerSchedulerKeyPerRound( + int maxAllocationsPerSchedulerKeyPerRound) { + this.maxAllocationsPerSchedulerKeyPerRound = + maxAllocationsPerSchedulerKeyPerRound; + } } /** @@ -188,6 +214,72 @@ public class OpportunisticContainerAllocator { private final BaseContainerTokenSecretManager tokenSecretManager; + static class Allocation { + private final Container container; + private final String resourceName; + + Allocation(Container container, String resourceName) { + this.container = container; + this.resourceName = resourceName; + } + + Container getContainer() { + return container; + } + + String getResourceName() { + return resourceName; + } + } + + static class EnrichedResourceRequest { + private final Map<String, AtomicInteger> nodeLocations = new HashMap<>(); + private final Map<String, AtomicInteger> rackLocations = new HashMap<>(); + private final ResourceRequest request; + + EnrichedResourceRequest(ResourceRequest request) { + this.request = request; + } + + ResourceRequest getRequest() { + return request; + } + + void addLocation(String location, int count) { + Map<String, AtomicInteger> m = rackLocations; + if (!location.startsWith("/")) { + m = nodeLocations; + } + if (count == 0) { + m.remove(location); + } else { + m.put(location, new AtomicInteger(count)); + } + } + + void removeLocation(String location) { + Map<String, AtomicInteger> m = rackLocations; + AtomicInteger count = m.get(location); + if (count == null) { + m = nodeLocations; + count = m.get(location); + } + + if (count != null) { + if (count.decrementAndGet() == 0) { + m.remove(location); + } + } + } + + Set<String> getNodeLocations() { + return nodeLocations.keySet(); + } + + Set<String> getRackLocations() { + return rackLocations.keySet(); + } + } /** * Create a new Opportunistic Container Allocator. * @param tokenSecretManager TokenSecretManager @@ -223,37 +315,55 @@ public class OpportunisticContainerAllocator { // Add OPPORTUNISTIC requests to the outstanding ones. opportContext.addToOutstandingReqs(oppResourceReqs); - // Satisfy the outstanding OPPORTUNISTIC requests. + Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist()); List<Container> allocatedContainers = new ArrayList<>(); - for (SchedulerRequestKey schedulerKey : - opportContext.getOutstandingOpReqs().descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given cap (the actual container size - // might be different than what is requested, which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map<Resource, List<Container>> allocated = allocate(rmIdentifier, - opportContext, schedulerKey, applicationAttemptId, appSubmitter); - for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { - opportContext.matchAllocationToOutstandingRequest( - e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map<Resource, List<Allocation>> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + for (Map<Resource, List<Allocation>> allocation : allocations) { + for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) { + opportContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); + } + } } } return allocatedContainers; } - private Map<Resource, List<Container>> allocate(long rmIdentifier, + private Map<Resource, List<Allocation>> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName) throws YarnException { - Map<Resource, List<Container>> containers = new HashMap<>(); - for (ResourceRequest anyAsk : + ApplicationAttemptId appAttId, String userName, Set<String> blackList) + throws YarnException { + Map<Resource, List<Allocation>> containers = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), appContext.getBlacklist(), - appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + appContext.getContainerIdGenerator(), blackList, appAttId, + appContext.getNodeMap(), userName, containers, enrichedAsk); + ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " + "allocationRequestId={}, num_containers={}, capability={}] " @@ -269,43 +379,162 @@ public class OpportunisticContainerAllocator { AllocationParams appParams, ContainerIdGenerator idCounter, Set<String> blacklist, ApplicationAttemptId id, Map<String, RemoteNode> allNodes, String userName, - Map<Resource, List<Container>> containers, ResourceRequest anyAsk) + Map<Resource, List<Allocation>> allocations, + EnrichedResourceRequest enrichedAsk) throws YarnException { + if (allNodes.size() == 0) { + LOG.info("No nodes currently available to " + + "allocate OPPORTUNISTIC containers."); + return; + } + ResourceRequest anyAsk = enrichedAsk.getRequest(); int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List<RemoteNode> nodesForScheduling = new ArrayList<>(); - for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + int numAllocated = 0; + // Node Candidates are selected as follows: + // * Node local candidates selected in loop == 0 + // * Rack local candidates selected in loop == 1 + // * From loop == 2 onwards, we revert to off switch allocations. + int loopIndex = OFF_SWITCH_LOOP; + if (enrichedAsk.getNodeLocations().size() > 0) { + loopIndex = NODE_LOCAL_LOOP; + } + while (numAllocated < toAllocate) { + Collection<RemoteNode> nodeCandidates = + findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk); + for (RemoteNode rNode : nodeCandidates) { + String rNodeHost = rNode.getNodeId().getHost(); + // Ignore black list + if (blacklist.contains(rNodeHost)) { + LOG.info("Nodes for scheduling has a blacklisted node" + + " [" + rNodeHost + "].."); + continue; + } + String location = ResourceRequest.ANY; + if (loopIndex == NODE_LOCAL_LOOP) { + if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { + location = rNodeHost; + } else { + continue; + } + } + if (loopIndex == RACK_LOCAL_LOOP) { + if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { + location = rNode.getRackName(); + } else { + continue; + } + } + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, location, + anyAsk, rNode); + numAllocated++; + // Try to spread the allocations across the nodes. + // But don't add if it is a node local request. + if (loopIndex != NODE_LOCAL_LOOP) { + blacklist.add(rNode.getNodeId().getHost()); + } + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + location + "]"); + if (numAllocated >= toAllocate) { + break; + } + } + if (loopIndex == NODE_LOCAL_LOOP && + enrichedAsk.getRackLocations().size() > 0) { + loopIndex = RACK_LOCAL_LOOP; + } else { + loopIndex++; + } + // Handle case where there are no nodes remaining after blacklist is + // considered. + if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { + LOG.warn("Unable to allocate any opportunistic containers."); + break; } - nodesForScheduling.add(nodeEntry.getValue()); } - if (nodesForScheduling.isEmpty()) { - LOG.warn("No nodes available for allocating opportunistic containers. [" + - "allNodes={}, blacklist={}]", allNodes, blacklist); - return; + } + + private Collection<RemoteNode> findNodeCandidates(int loopIndex, + Map<String, RemoteNode> allNodes, Set<String> blackList, + EnrichedResourceRequest enrichedRR) { + if (loopIndex > 1) { + return allNodes.values(); + } else { + LinkedList<RemoteNode> retList = new LinkedList<>(); + int numContainers = enrichedRR.getRequest().getNumContainers(); + while (numContainers > 0) { + if (loopIndex == 0) { + // Node local candidates + numContainers = collectNodeLocalCandidates( + allNodes, enrichedRR, retList, numContainers); + } else { + // Rack local candidates + numContainers = collectRackLocalCandidates( + allNodes, enrichedRR, retList, blackList, numContainers); + } + if (numContainers == enrichedRR.getRequest().getNumContainers()) { + // If there is no change in numContainers, then there is no point + // in looping again. + break; + } + } + return retList; } - int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - RemoteNode node = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(rmIdentifier, appParams, idCounter, - anyAsk, id, userName, node); - List<Container> cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); + } + + private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes, + EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList, + Set<String> blackList, int numContainers) { + for (RemoteNode rNode : allNodes.values()) { + if (enrichedRR.getRackLocations().contains(rNode.getRackName())) { + if (blackList.contains(rNode.getNodeId().getHost())) { + retList.addLast(rNode); + } else { + retList.addFirst(rNode); + numContainers--; + } + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes, + EnrichedResourceRequest enrichedRR, List<RemoteNode> retList, + int numContainers) { + for (String nodeName : enrichedRR.getNodeLocations()) { + RemoteNode remoteNode = allNodes.get(nodeName); + if (remoteNode != null) { + retList.add(remoteNode); + numContainers--; } - cList.add(container); - numAllocated++; - LOG.info("Allocated [{}] as opportunistic.", container.getId()); + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private Container createContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ApplicationAttemptId id, String userName, + Map<Resource, List<Allocation>> allocations, String location, + ResourceRequest anyAsk, RemoteNode rNode) throws YarnException { + Container container = buildContainer(rmIdentifier, appParams, + idCounter, anyAsk, id, userName, rNode); + List<Allocation> allocList = allocations.get(anyAsk.getCapability()); + if (allocList == null) { + allocList = new ArrayList<>(); + allocations.put(anyAsk.getCapability(), allocList); } - LOG.info("Allocated {} opportunistic containers.", numAllocated); + allocList.add(new Allocation(container, location)); + return container; } private Container buildContainer(long rmIdentifier, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 1b1c5b9..246d450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; @@ -35,8 +34,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest; /** * This encapsulates application specific information used by the @@ -53,7 +54,8 @@ public class OpportunisticContainerContext { new ContainerIdGenerator(); private volatile List<RemoteNode> nodeList = new LinkedList<>(); - private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>(); + private final LinkedHashMap<String, RemoteNode> nodeMap = + new LinkedHashMap<>(); private final Set<String> blacklist = new HashSet<>(); @@ -61,7 +63,8 @@ public class OpportunisticContainerContext { // Resource Name (host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC // ResourceRequest (ask). - private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> + private final TreeMap + <SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>> outstandingOpReqs = new TreeMap<>(); public AllocationParams getAppParams() { @@ -107,7 +110,7 @@ public class OpportunisticContainerContext { return blacklist; } - public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> + public TreeMap<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>> getOutstandingOpReqs() { return outstandingOpReqs; } @@ -125,36 +128,32 @@ public class OpportunisticContainerContext { for (ResourceRequest request : resourceAsks) { SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map<Resource, ResourceRequest> reqMap = + Map<Resource, EnrichedResourceRequest> reqMap = outstandingOpReqs.get(schedulerKey); if (reqMap == null) { reqMap = new HashMap<>(); outstandingOpReqs.put(schedulerKey, reqMap); } - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); + EnrichedResourceRequest eReq = reqMap.get(request.getCapability()); + if (eReq == null) { + eReq = new EnrichedResourceRequest(request); + reqMap.put(request.getCapability(), eReq); + } + // Set numContainers only for ANY request + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + eReq.getRequest().setResourceName(ResourceRequest.ANY); + eReq.getRequest().setNumContainers(request.getNumContainers()); } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); + eReq.addLocation(request.getResourceName(), request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { LOG.info("# of outstandingOpReqs in ANY (at " + "priority = " + schedulerKey.getPriority() + ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); + + ", with location = " + request.getResourceName() + " ) : " + + ", numContainers = " + eReq.getRequest().getNumContainers()); } } } @@ -163,25 +162,34 @@ public class OpportunisticContainerContext { * This method matches a returned list of Container Allocations to any * outstanding OPPORTUNISTIC ResourceRequest. * @param capability Capability - * @param allocatedContainers Allocated Containers + * @param allocations Allocations. */ public void matchAllocationToOutstandingRequest(Resource capability, - List<Container> allocatedContainers) { - for (Container c : allocatedContainers) { + List<Allocation> allocations) { + for (OpportunisticContainerAllocator.Allocation allocation : allocations) { SchedulerRequestKey schedulerKey = - SchedulerRequestKey.extractFrom(c); - Map<Resource, ResourceRequest> asks = + SchedulerRequestKey.extractFrom(allocation.getContainer()); + Map<Resource, EnrichedResourceRequest> asks = outstandingOpReqs.get(schedulerKey); if (asks == null) { continue; } - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { + EnrichedResourceRequest err = asks.get(capability); + if (err != null) { + int numContainers = err.getRequest().getNumContainers(); + numContainers--; + err.getRequest().setNumContainers(numContainers); + if (numContainers == 0) { asks.remove(capability); + if (asks.size() == 0) { + outstandingOpReqs.remove(schedulerKey); + } + } else { + if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) { + err.removeLocation(allocation.getResourceName()); + } } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index e889cde..8e59f14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,6 +30,7 @@ import "yarn_service_protos.proto"; message RemoteNodeProto { optional NodeIdProto node_id = 1; optional string http_address = 2; + optional string rack_name = 3; } message RegisterDistributedSchedulingAMResponseProto { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org