http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java new file mode 100644 index 0000000..bfb12ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; + + + +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * <p>The DistributedScheduler runs on the NodeManager and is modeled as an + * <code>AMRMProxy</code> request interceptor. It is responsible for the + * following:</p> + * <ul> + * <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the + * response objects to extract instructions from the + * <code>ClusterMonitor</code> running on the ResourceManager to aid in making + * distributed scheduling decisions.</li> + * <li>Call the <code>OpportunisticContainerAllocator</code> to allocate + * containers for the outstanding OPPORTUNISTIC container requests.</li> + * </ul> + */ +public final class DistributedScheduler extends AbstractRequestInterceptor { + + static class PartitionedResourceRequests { + private List<ResourceRequest> guaranteed = new ArrayList<>(); + private List<ResourceRequest> opportunistic = new ArrayList<>(); + public List<ResourceRequest> getGuaranteed() { + return guaranteed; + } + public List<ResourceRequest> getOpportunistic() { + return opportunistic; + } + } + + static class DistributedSchedulerParams { + Resource maxResource; + Resource minResource; + Resource incrementResource; + int containerTokenExpiryInterval; + } + + private static final Logger LOG = LoggerFactory + .getLogger(DistributedScheduler.class); + + private final static RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. + private Set<ContainerId> containersAllocated = new HashSet<>(); + + private DistributedSchedulerParams appParams = + new DistributedSchedulerParams(); + private final OpportunisticContainerAllocator.ContainerIdCounter + containerIdCounter = + new OpportunisticContainerAllocator.ContainerIdCounter(); + private Map<String, NodeId> nodeList = new LinkedHashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map<NodeId, NMToken> nodeTokens = new HashMap<>(); + final Set<String> blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequest (ask). + final TreeMap<Priority, Map<Resource, ResourceRequest>> + outstandingOpReqs = new TreeMap<>(); + + private ApplicationAttemptId applicationAttemptId; + private OpportunisticContainerAllocator containerAllocator; + private NMTokenSecretManagerInNM nmSecretManager; + private String appSubmitter; + + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + initLocal(appContext.getApplicationAttemptId(), + appContext.getNMCotext().getContainerAllocator(), + appContext.getNMCotext().getNMTokenSecretManager(), + appContext.getUser()); + } + + @VisibleForTesting + void initLocal(ApplicationAttemptId applicationAttemptId, + OpportunisticContainerAllocator containerAllocator, + NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { + this.applicationAttemptId = applicationAttemptId; + this.containerAllocator = containerAllocator; + this.nmSecretManager = nmSecretManager; + this.appSubmitter = appSubmitter; + } + + /** + * Route register call to the corresponding distributed scheduling method viz. + * registerApplicationMasterForDistributedScheduling, and return response to + * the caller after stripping away Distributed Scheduling information. + * + * @param request + * registration request + * @return Allocate Response + * @throws YarnException YarnException + * @throws IOException IOException + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return registerApplicationMasterForDistributedScheduling(request) + .getRegisterResponse(); + } + + /** + * Route allocate call to the allocateForDistributedScheduling method and + * return response to the caller after stripping away Distributed Scheduling + * information. + * + * @param request + * allocation request + * @return Allocate Response + * @throws YarnException YarnException + * @throws IOException IOException + */ + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + DistributedSchedulingAllocateRequest distRequest = RECORD_FACTORY + .newRecordInstance(DistributedSchedulingAllocateRequest.class); + distRequest.setAllocateRequest(request); + return allocateForDistributedScheduling(distRequest).getAllocateResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + /** + * Check if we already have a NMToken. if Not, generate the Token and + * add it to the response + */ + private void updateResponseWithNMTokens(AllocateResponse response, + List<NMToken> nmTokens, List<Container> allocatedContainers) { + List<NMToken> newTokens = new ArrayList<>(); + if (allocatedContainers.size() > 0) { + response.getAllocatedContainers().addAll(allocatedContainers); + for (Container alloc : allocatedContainers) { + if (!nodeTokens.containsKey(alloc.getNodeId())) { + newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); + } + } + List<NMToken> retTokens = new ArrayList<>(nmTokens); + retTokens.addAll(newTokens); + response.setNMTokens(retTokens); + } + } + + private PartitionedResourceRequests partitionAskList(List<ResourceRequest> + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } + + private void updateParameters( + RegisterDistributedSchedulingAMResponse registerResponse) { + appParams.minResource = registerResponse.getMinContainerResource(); + appParams.maxResource = registerResponse.getMaxContainerResource(); + appParams.incrementResource = + registerResponse.getIncrContainerResource(); + if (appParams.incrementResource == null) { + appParams.incrementResource = appParams.minResource; + } + appParams.containerTokenExpiryInterval = registerResponse + .getContainerTokenExpiryInterval(); + + containerIdCounter + .resetContainerIdCounter(registerResponse.getContainerIdStart()); + setNodeList(registerResponse.getNodesForScheduling()); + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds to the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability. + * + * @param resourceAsks the list with the {@link ResourceRequest}s + */ + public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) { + for (ResourceRequest request : resourceAsks) { + Priority priority = request.getPriority(); + + // TODO: Extend for Node/Rack locality. We only handle ANY requests now + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map<Resource, ResourceRequest> reqMap = + this.outstandingOpReqs.get(priority); + if (reqMap == null) { + reqMap = new HashMap<>(); + this.outstandingOpReqs.put(priority, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest. + */ + private void matchAllocationToOutstandingRequest(Resource capability, + List<Container> allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map<Resource, ResourceRequest> asks = + outstandingOpReqs.get(c.getPriority()); + + if (asks == null) + continue; + + ResourceRequest rr = asks.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) { + asks.remove(capability); + } + } + } + } + + private void setNodeList(List<NodeId> nodeList) { + this.nodeList.clear(); + addToNodeList(nodeList); + } + + private void addToNodeList(List<NodeId> nodes) { + for (NodeId n : nodes) { + this.nodeList.put(n.getHost(), n); + } + } + + @Override + public RegisterDistributedSchedulingAMResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + LOG.info("Forwarding registration request to the" + + "Distributed Scheduler Service on YARN RM"); + RegisterDistributedSchedulingAMResponse dsResp = getNextInterceptor() + .registerApplicationMasterForDistributedScheduling(request); + updateParameters(dsResp); + return dsResp; + } + + @Override + public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( + DistributedSchedulingAllocateRequest request) + throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = + partitionAskList(request.getAllocateRequest().getAskList()); + + List<ContainerId> releasedContainers = + request.getAllocateRequest().getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + containersAllocated.removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = + request.getAllocateRequest().getResourceBlacklistRequest(); + if (rbr != null) { + blacklist.removeAll(rbr.getBlacklistRemovals()); + blacklist.addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List<Container> allocatedContainers = new ArrayList<>(); + for (Priority priority : outstandingOpReqs.descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map<Resource, List<Container>> allocated = + containerAllocator.allocate(this.appParams, containerIdCounter, + outstandingOpReqs.get(priority).values(), blacklist, + applicationAttemptId, nodeList, appSubmitter); + for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { + matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + + request.setAllocatedContainers(allocatedContainers); + + // Send all the GUARANTEED Reqs to RM + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); + DistributedSchedulingAllocateResponse dsResp = + getNextInterceptor().allocateForDistributedScheduling(request); + + // Update host to nodeId mapping + setNodeList(dsResp.getNodesForScheduling()); + List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens(); + for (NMToken nmToken : nmTokens) { + nodeTokens.put(nmToken.getNodeId(), nmToken); + } + + List<ContainerStatus> completedContainers = + dsResp.getAllocateResponse().getCompletedContainersStatuses(); + + // Only account for opportunistic containers + for (ContainerStatus cs : completedContainers) { + if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + containersAllocated.remove(cs.getContainerId()); + } + } + + // Check if we have NM tokens for all the allocated containers. If not + // generate one and update the response. + updateResponseWithNMTokens( + dsResp.getAllocateResponse(), nmTokens, allocatedContainers); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Number of opportunistic containers currently allocated by" + + "application: " + containersAllocated.size()); + } + return dsResp; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java deleted file mode 100644 index ec0e8a4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ /dev/null @@ -1,432 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.nodemanager.amrmproxy - .AMRMProxyApplicationContext; -import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; - - - -import org.apache.hadoop.yarn.server.nodemanager.security - .NMTokenSecretManagerInNM; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -/** - * <p>The LocalScheduler runs on the NodeManager and is modelled as an - * <code>AMRMProxy</code> request interceptor. It is responsible for the - * following :</p> - * <ul> - * <li>Intercept <code>ApplicationMasterProtocol</code> calls and unwrap the - * response objects to extract instructions from the - * <code>ClusterManager</code> running on the ResourceManager to aid in making - * Scheduling scheduling decisions</li> - * <li>Call the <code>OpportunisticContainerAllocator</code> to allocate - * containers for the opportunistic resource outstandingOpReqs</li> - * </ul> - */ -public final class LocalScheduler extends AbstractRequestInterceptor { - - static class PartitionedResourceRequests { - private List<ResourceRequest> guaranteed = new ArrayList<>(); - private List<ResourceRequest> opportunistic = new ArrayList<>(); - public List<ResourceRequest> getGuaranteed() { - return guaranteed; - } - public List<ResourceRequest> getOpportunistic() { - return opportunistic; - } - } - - static class DistSchedulerParams { - Resource maxResource; - Resource minResource; - Resource incrementResource; - int containerTokenExpiryInterval; - } - - private static final Logger LOG = LoggerFactory - .getLogger(LocalScheduler.class); - - private final static RecordFactory RECORD_FACTORY = - RecordFactoryProvider.getRecordFactory(null); - - // Currently just used to keep track of allocated Containers - // Can be used for reporting stats later - private Set<ContainerId> containersAllocated = new HashSet<>(); - - private DistSchedulerParams appParams = new DistSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map<String, NodeId> nodeList = new LinkedHashMap<>(); - - // Mapping of NodeId to NodeTokens. Populated either from RM response or - // generated locally if required. - private Map<NodeId, NMToken> nodeTokens = new HashMap<>(); - final Set<String> blacklist = new HashSet<>(); - - // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, - // Resource Name (Host/rack/any) and capability. This mapping is required - // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequests (ask) - final TreeMap<Priority, Map<Resource, ResourceRequest>> - outstandingOpReqs = new TreeMap<>(); - - private ApplicationAttemptId applicationAttemptId; - private OpportunisticContainerAllocator containerAllocator; - private NMTokenSecretManagerInNM nmSecretManager; - private String appSubmitter; - - public void init(AMRMProxyApplicationContext appContext) { - super.init(appContext); - initLocal(appContext.getApplicationAttemptId(), - appContext.getNMCotext().getContainerAllocator(), - appContext.getNMCotext().getNMTokenSecretManager(), - appContext.getUser()); - } - - @VisibleForTesting - void initLocal(ApplicationAttemptId applicationAttemptId, - OpportunisticContainerAllocator containerAllocator, - NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { - this.applicationAttemptId = applicationAttemptId; - this.containerAllocator = containerAllocator; - this.nmSecretManager = nmSecretManager; - this.appSubmitter = appSubmitter; - } - - /** - * Route register call to the corresponding distributed scheduling method viz. - * registerApplicationMasterForDistributedScheduling, and return response to - * the caller after stripping away Distributed Scheduling information. - * - * @param request - * registration request - * @return Allocate Response - * @throws YarnException - * @throws IOException - */ - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { - return registerApplicationMasterForDistributedScheduling(request) - .getRegisterResponse(); - } - - /** - * Route allocate call to the allocateForDistributedScheduling method and - * return response to the caller after stripping away Distributed Scheduling - * information. - * - * @param request - * allocation request - * @return Allocate Response - * @throws YarnException - * @throws IOException - */ - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - DistSchedAllocateRequest distRequest = - RECORD_FACTORY.newRecordInstance(DistSchedAllocateRequest.class); - distRequest.setAllocateRequest(request); - return allocateForDistributedScheduling(distRequest).getAllocateResponse(); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - return getNextInterceptor().finishApplicationMaster(request); - } - - /** - * Check if we already have a NMToken. if Not, generate the Token and - * add it to the response - * @param response - * @param nmTokens - * @param allocatedContainers - */ - private void updateResponseWithNMTokens(AllocateResponse response, - List<NMToken> nmTokens, List<Container> allocatedContainers) { - List<NMToken> newTokens = new ArrayList<>(); - if (allocatedContainers.size() > 0) { - response.getAllocatedContainers().addAll(allocatedContainers); - for (Container alloc : allocatedContainers) { - if (!nodeTokens.containsKey(alloc.getNodeId())) { - newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); - } - } - List<NMToken> retTokens = new ArrayList<>(nmTokens); - retTokens.addAll(newTokens); - response.setNMTokens(retTokens); - } - } - - private PartitionedResourceRequests partitionAskList(List<ResourceRequest> - askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - - private void updateParameters( - DistSchedRegisterResponse registerResponse) { - appParams.minResource = registerResponse.getMinAllocatableCapabilty(); - appParams.maxResource = registerResponse.getMaxAllocatableCapabilty(); - appParams.incrementResource = - registerResponse.getIncrAllocatableCapabilty(); - if (appParams.incrementResource == null) { - appParams.incrementResource = appParams.minResource; - } - appParams.containerTokenExpiryInterval = registerResponse - .getContainerTokenExpiryInterval(); - - containerIdCounter - .resetContainerIdCounter(registerResponse.getContainerIdStart()); - setNodeList(registerResponse.getNodesForScheduling()); - } - - /** - * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds it the outstanding - * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce - * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability - * @param resourceAsks - */ - public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) { - for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); - - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map<Resource, ResourceRequest> reqMap = - this.outstandingOpReqs.get(priority); - if (reqMap == null) { - reqMap = new HashMap<>(); - this.outstandingOpReqs.put(priority, reqMap); - } - - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); - } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); - } - if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority - + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); - } - } - } - - /** - * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest - * @param capability - * @param allocatedContainers - */ - public void matchAllocationToOutstandingRequest(Resource capability, - List<Container> allocatedContainers) { - for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); - Map<Resource, ResourceRequest> asks = - outstandingOpReqs.get(c.getPriority()); - - if (asks == null) - continue; - - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { - asks.remove(capability); - } - } - } - } - - private void setNodeList(List<NodeId> nodeList) { - this.nodeList.clear(); - addToNodeList(nodeList); - } - - private void addToNodeList(List<NodeId> nodes) { - for (NodeId n : nodes) { - this.nodeList.put(n.getHost(), n); - } - } - - @Override - public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling( - RegisterApplicationMasterRequest request) - throws YarnException, IOException { - LOG.info("Forwarding registration request to the" + - "Distributed Scheduler Service on YARN RM"); - DistSchedRegisterResponse dsResp = getNextInterceptor() - .registerApplicationMasterForDistributedScheduling(request); - updateParameters(dsResp); - return dsResp; - } - - @Override - public DistSchedAllocateResponse allocateForDistributedScheduling( - DistSchedAllocateRequest request) throws YarnException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); - } - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = partitionAskList( - request.getAllocateRequest().getAskList()); - - List<ContainerId> releasedContainers = - request.getAllocateRequest().getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - containersAllocated.removeAll(releasedContainers); - } - - // Also, update black list - ResourceBlacklistRequest rbr = - request.getAllocateRequest().getResourceBlacklistRequest(); - if (rbr != null) { - blacklist.removeAll(rbr.getBlacklistRemovals()); - blacklist.addAll(rbr.getBlacklistAdditions()); - } - - // Add OPPORTUNISTIC reqs to the outstanding reqs - addToOutstandingReqs(partitionedAsks.getOpportunistic()); - - List<Container> allocatedContainers = new ArrayList<>(); - for (Priority priority : outstandingOpReqs.descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given Cap (The actual container size - // might be different than what is requested.. which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map<Resource, List<Container>> allocated = - containerAllocator.allocate(this.appParams, containerIdCounter, - outstandingOpReqs.get(priority).values(), blacklist, - applicationAttemptId, nodeList, appSubmitter); - for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { - matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); - } - } - request.setAllocatedContainers(allocatedContainers); - - // Send all the GUARANTEED Reqs to RM - request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); - DistSchedAllocateResponse dsResp = - getNextInterceptor().allocateForDistributedScheduling(request); - - // Update host to nodeId mapping - setNodeList(dsResp.getNodesForScheduling()); - List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens(); - for (NMToken nmToken : nmTokens) { - nodeTokens.put(nmToken.getNodeId(), nmToken); - } - - List<ContainerStatus> completedContainers = - dsResp.getAllocateResponse().getCompletedContainersStatuses(); - - // Only account for opportunistic containers - for (ContainerStatus cs : completedContainers) { - if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); - } - } - - // Check if we have NM tokens for all the allocated containers. If not - // generate one and update the response. - updateResponseWithNMTokens( - dsResp.getAllocateResponse(), nmTokens, allocatedContainers); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Number of opportunistic containers currently allocated by" + - "application: " + containersAllocated.size()); - } - return dsResp; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index 22a6a24..ce5bda0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler.DistSchedulerParams; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -37,15 +37,17 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.net.InetSocketAddress; import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; /** - * <p>The OpportunisticContainerAllocator allocates containers on a given list - * of Nodes after it modifies the container sizes to within allowable limits - * specified by the <code>ClusterManager</code> running on the RM. It tries to - * distribute the containers as evenly as possible. It also uses the - * <code>NMTokenSecretManagerInNM</code> to generate the required NM tokens for - * the allocated containers</p> + * <p> + * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as possible. + * It also uses the <code>NMTokenSecretManagerInNM</code> to generate the + * required NM tokens for the allocated containers. + * </p> */ public class OpportunisticContainerAllocator { @@ -78,15 +80,15 @@ public class OpportunisticContainerAllocator { this.webpagePort = webpagePort; } - public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams, - ContainerIdCounter idCounter, Collection<ResourceRequest> resourceAsks, - Set<String> blacklist, ApplicationAttemptId appAttId, - Map<String, NodeId> allNodes, String userName) throws YarnException { + public Map<Resource, List<Container>> allocate( + DistributedSchedulerParams appParams, ContainerIdCounter idCounter, + Collection<ResourceRequest> resourceAsks, Set<String> blacklist, + ApplicationAttemptId appAttId, Map<String, NodeId> allNodes, + String userName) throws YarnException { Map<Resource, List<Container>> containers = new HashMap<>(); - Set<String> nodesAllocated = new HashSet<>(); for (ResourceRequest anyAsk : resourceAsks) { allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, - allNodes, userName, containers, nodesAllocated, anyAsk); + allNodes, userName, containers, anyAsk); LOG.info("Opportunistic allocation requested for [" + "priority=" + anyAsk.getPriority() + ", num_containers=" + anyAsk.getNumContainers() @@ -96,30 +98,30 @@ public class OpportunisticContainerAllocator { return containers; } - private void allocateOpportunisticContainers(DistSchedulerParams appParams, - ContainerIdCounter idCounter, Set<String> blacklist, - ApplicationAttemptId id, Map<String, NodeId> allNodes, String userName, - Map<Resource, List<Container>> containers, Set<String> nodesAllocated, - ResourceRequest anyAsk) throws YarnException { + private void allocateOpportunisticContainers( + DistributedSchedulerParams appParams, ContainerIdCounter idCounter, + Set<String> blacklist, ApplicationAttemptId id, + Map<String, NodeId> allNodes, String userName, + Map<Resource, List<Container>> containers, ResourceRequest anyAsk) + throws YarnException { int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? - 0 : containers.get(anyAsk.getCapability()).size()); + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); - List<String> topKNodesLeft = new ArrayList<>(); - for (String s : allNodes.keySet()) { - // Bias away from whatever we have already allocated and respect blacklist - if (nodesAllocated.contains(s) || blacklist.contains(s)) { + List<NodeId> nodesForScheduling = new ArrayList<>(); + for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) { + // Do not use blacklisted nodes for scheduling. + if (blacklist.contains(nodeEntry.getKey())) { continue; } - topKNodesLeft.add(s); + nodesForScheduling.add(nodeEntry.getValue()); } int numAllocated = 0; - int nextNodeToAllocate = 0; + int nextNodeToSchedule = 0; for (int numCont = 0; numCont < toAllocate; numCont++) { - String topNode = topKNodesLeft.get(nextNodeToAllocate); - nextNodeToAllocate++; - nextNodeToAllocate %= topKNodesLeft.size(); - NodeId nodeId = allNodes.get(topNode); + nextNodeToSchedule++; + nextNodeToSchedule %= nodesForScheduling.size(); + NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); Container container = buildContainer(appParams, idCounter, anyAsk, id, userName, nodeId); List<Container> cList = containers.get(anyAsk.getCapability()); @@ -134,7 +136,7 @@ public class OpportunisticContainerAllocator { LOG.info("Allocated " + numAllocated + " opportunistic containers."); } - private Container buildContainer(DistSchedulerParams appParams, + private Container buildContainer(DistributedSchedulerParams appParams, ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, String userName, NodeId nodeId) throws YarnException { ContainerId cId = @@ -165,7 +167,7 @@ public class OpportunisticContainerAllocator { return container; } - private Resource normalizeCapability(DistSchedulerParams appParams, + private Resource normalizeCapability(DistributedSchedulerParams appParams, ResourceRequest ask) { return Resources.normalize(RESOURCE_CALCULATOR, ask.getCapability(), appParams.minResource, appParams.maxResource, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java new file mode 100644 index 0000000..b093b3b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Test cases for {@link DistributedScheduler}. + */ +public class TestDistributedScheduler { + + @Test + public void testDistributedScheduler() throws Exception { + + Configuration conf = new Configuration(); + DistributedScheduler distributedScheduler = new DistributedScheduler(); + + RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler); + + registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList( + NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); + + final AtomicBoolean flipFlag = new AtomicBoolean(true); + Mockito.when( + finalReqIntcptr.allocateForDistributedScheduling( + Mockito.any(DistributedSchedulingAllocateRequest.class))) + .thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() { + @Override + public DistributedSchedulingAllocateResponse answer( + InvocationOnMock invocationOnMock) throws Throwable { + flipFlag.set(!flipFlag.get()); + if (flipFlag.get()) { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); + } else { + return createAllocateResponse(Arrays.asList( + NodeId.newInstance("d", 4), NodeId.newInstance("c", 3))); + } + } + }); + + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + ResourceRequest guaranteedReq = + createResourceRequest(ExecutionType.GUARANTEED, 5, "*"); + + ResourceRequest opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*"); + + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 4 containers were allocated + AllocateResponse allocateResponse = + distributedScheduler.allocate(allocateRequest); + Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); + + // Verify equal distribution on hosts a and b, and none on c or d + Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); + Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); + Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); + + // New Allocate request + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + + // Verify 6 containers were allocated + allocateResponse = distributedScheduler.allocate(allocateRequest); + Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); + + // Verify new containers are equally distribution on hosts c and d, + // and none on a or b + allocs = mapAllocs(allocateResponse, 6); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); + Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); + Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); + Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); + + // Ensure the DistributedScheduler respects the list order.. + // The first request should be allocated to "d" since it is ranked higher + // The second request should be allocated to "c" since the ranking is + // flipped on every allocate response. + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = distributedScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = distributedScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); + + allocateRequest = Records.newRecord(AllocateRequest.class); + opportunisticReq = + createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); + allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); + allocateResponse = distributedScheduler.allocate(allocateRequest); + allocs = mapAllocs(allocateResponse, 1); + Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); + } + + private void registerAM(DistributedScheduler distributedScheduler, + RequestInterceptor finalReqIntcptr, List<NodeId> nodeList) + throws Exception { + RegisterDistributedSchedulingAMResponse distSchedRegisterResponse = + Records.newRecord(RegisterDistributedSchedulingAMResponse.class); + distSchedRegisterResponse.setRegisterResponse( + Records.newRecord(RegisterApplicationMasterResponse.class)); + distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); + distSchedRegisterResponse.setContainerIdStart(0); + distSchedRegisterResponse.setMaxContainerResource( + Resource.newInstance(1024, 4)); + distSchedRegisterResponse.setMinContainerResource( + Resource.newInstance(512, 2)); + distSchedRegisterResponse.setNodesForScheduling(nodeList); + Mockito.when( + finalReqIntcptr.registerApplicationMasterForDistributedScheduling( + Mockito.any(RegisterApplicationMasterRequest.class))) + .thenReturn(distSchedRegisterResponse); + + distributedScheduler.registerApplicationMaster( + Records.newRecord(RegisterApplicationMasterRequest.class)); + } + + private RequestInterceptor setup(Configuration conf, + DistributedScheduler distributedScheduler) { + NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); + Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); + Context context = Mockito.mock(Context.class); + NMContainerTokenSecretManager nmContainerTokenSecretManager = new + NMContainerTokenSecretManager(conf); + MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + nmContainerTokenSecretManager.setMasterKey(mKey); + Mockito.when(context.getContainerTokenSecretManager()).thenReturn + (nmContainerTokenSecretManager); + OpportunisticContainerAllocator containerAllocator = + new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + + NMTokenSecretManagerInNM nmTokenSecretManagerInNM = + new NMTokenSecretManagerInNM(); + nmTokenSecretManagerInNM.setMasterKey(mKey); + distributedScheduler.initLocal( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), + containerAllocator, nmTokenSecretManagerInNM, "test"); + + RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); + distributedScheduler.setNextInterceptor(finalReqIntcptr); + return finalReqIntcptr; + } + + private ResourceRequest createResourceRequest(ExecutionType execType, + int numContainers, String resourceName) { + ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); + opportunisticReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(execType, true)); + opportunisticReq.setNumContainers(numContainers); + opportunisticReq.setCapability(Resource.newInstance(1024, 4)); + opportunisticReq.setPriority(Priority.newInstance(100)); + opportunisticReq.setRelaxLocality(true); + opportunisticReq.setResourceName(resourceName); + return opportunisticReq; + } + + private DistributedSchedulingAllocateResponse createAllocateResponse( + List<NodeId> nodes) { + DistributedSchedulingAllocateResponse distSchedAllocateResponse = + Records.newRecord(DistributedSchedulingAllocateResponse.class); + distSchedAllocateResponse + .setAllocateResponse(Records.newRecord(AllocateResponse.class)); + distSchedAllocateResponse.setNodesForScheduling(nodes); + return distSchedAllocateResponse; + } + + private Map<NodeId, List<ContainerId>> mapAllocs( + AllocateResponse allocateResponse, int expectedSize) throws Exception { + Assert.assertEquals(expectedSize, + allocateResponse.getAllocatedContainers().size()); + Map<NodeId, List<ContainerId>> allocs = new HashMap<>(); + for (Container c : allocateResponse.getAllocatedContainers()) { + ContainerTokenIdentifier cTokId = BuilderUtils + .newContainerTokenIdentifier(c.getContainerToken()); + Assert.assertEquals( + c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), + cTokId.getNmHostAddress()); + List<ContainerId> cIds = allocs.get(c.getNodeId()); + if (cIds == null) { + cIds = new ArrayList<>(); + allocs.put(c.getNodeId(), cIds); + } + cIds.add(c.getId()); + } + return allocs; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java deleted file mode 100644 index 8de849b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords - .RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; -import org.apache.hadoop.yarn.server.nodemanager.security - .NMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.nodemanager.security - .NMTokenSecretManagerInNM; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TestLocalScheduler { - - @Test - public void testLocalScheduler() throws Exception { - - Configuration conf = new Configuration(); - LocalScheduler localScheduler = new LocalScheduler(); - - RequestInterceptor finalReqIntcptr = setup(conf, localScheduler); - - registerAM(localScheduler, finalReqIntcptr, Arrays.asList( - NodeId.newInstance("a", 1), NodeId.newInstance("b", 2))); - - final AtomicBoolean flipFlag = new AtomicBoolean(false); - Mockito.when( - finalReqIntcptr.allocateForDistributedScheduling( - Mockito.any(DistSchedAllocateRequest.class))) - .thenAnswer(new Answer<DistSchedAllocateResponse>() { - @Override - public DistSchedAllocateResponse answer(InvocationOnMock - invocationOnMock) throws Throwable { - flipFlag.set(!flipFlag.get()); - if (flipFlag.get()) { - return createAllocateResponse(Arrays.asList( - NodeId.newInstance("c", 3), NodeId.newInstance("d", 4))); - } else { - return createAllocateResponse(Arrays.asList( - NodeId.newInstance("d", 4), NodeId.newInstance("c", 3))); - } - } - }); - - AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - ResourceRequest guaranteedReq = - createResourceRequest(ExecutionType.GUARANTEED, 5, "*"); - - ResourceRequest opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - - // Verify 4 containers were allocated - AllocateResponse allocateResponse = - localScheduler.allocate(allocateRequest); - Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size()); - - // Verify equal distribution on hosts a and b - // And None on c and d - Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size()); - Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size()); - Assert.assertNull(allocs.get(NodeId.newInstance("c", 3))); - Assert.assertNull(allocs.get(NodeId.newInstance("d", 4))); - - // New Allocate request - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 6, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - - // Verify 6 containers were allocated - allocateResponse = localScheduler.allocate(allocateRequest); - Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size()); - - // Verify New containers are equally distribution on hosts c and d - // And None on a and b - allocs = mapAllocs(allocateResponse, 6); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size()); - Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size()); - Assert.assertNull(allocs.get(NodeId.newInstance("a", 1))); - Assert.assertNull(allocs.get(NodeId.newInstance("b", 2))); - - // Ensure the LocalScheduler respects the list order.. - // The first request should be allocated to "d" since it is ranked higher - // The second request should be allocated to "c" since the ranking is - // flipped on every allocate response. - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - allocateResponse = localScheduler.allocate(allocateRequest); - allocs = mapAllocs(allocateResponse, 1); - Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); - - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - allocateResponse = localScheduler.allocate(allocateRequest); - allocs = mapAllocs(allocateResponse, 1); - Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size()); - - allocateRequest = Records.newRecord(AllocateRequest.class); - opportunisticReq = - createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*"); - allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq)); - allocateResponse = localScheduler.allocate(allocateRequest); - allocs = mapAllocs(allocateResponse, 1); - Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size()); - } - - private void registerAM(LocalScheduler localScheduler, RequestInterceptor - finalReqIntcptr, List<NodeId> nodeList) throws Exception { - DistSchedRegisterResponse distSchedRegisterResponse = - Records.newRecord(DistSchedRegisterResponse.class); - distSchedRegisterResponse.setRegisterResponse( - Records.newRecord(RegisterApplicationMasterResponse.class)); - distSchedRegisterResponse.setContainerTokenExpiryInterval(12345); - distSchedRegisterResponse.setContainerIdStart(0); - distSchedRegisterResponse.setMaxAllocatableCapabilty( - Resource.newInstance(1024, 4)); - distSchedRegisterResponse.setMinAllocatableCapabilty( - Resource.newInstance(512, 2)); - distSchedRegisterResponse.setNodesForScheduling(nodeList); - Mockito.when( - finalReqIntcptr.registerApplicationMasterForDistributedScheduling( - Mockito.any(RegisterApplicationMasterRequest.class))) - .thenReturn(distSchedRegisterResponse); - - localScheduler.registerApplicationMaster( - Records.newRecord(RegisterApplicationMasterRequest.class)); - } - - private RequestInterceptor setup(Configuration conf, LocalScheduler - localScheduler) { - NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); - Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); - NMContainerTokenSecretManager nmContainerTokenSecretManager = new - NMContainerTokenSecretManager(conf); - MasterKey mKey = new MasterKey() { - @Override - public int getKeyId() { - return 1; - } - @Override - public void setKeyId(int keyId) {} - @Override - public ByteBuffer getBytes() { - return ByteBuffer.allocate(8); - } - @Override - public void setBytes(ByteBuffer bytes) {} - }; - nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); - OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); - - NMTokenSecretManagerInNM nmTokenSecretManagerInNM = - new NMTokenSecretManagerInNM(); - nmTokenSecretManagerInNM.setMasterKey(mKey); - localScheduler.initLocal( - ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), - containerAllocator, nmTokenSecretManagerInNM, "test"); - - RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class); - localScheduler.setNextInterceptor(finalReqIntcptr); - return finalReqIntcptr; - } - - private ResourceRequest createResourceRequest(ExecutionType execType, - int numContainers, String resourceName) { - ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(execType, true)); - opportunisticReq.setNumContainers(numContainers); - opportunisticReq.setCapability(Resource.newInstance(1024, 4)); - opportunisticReq.setPriority(Priority.newInstance(100)); - opportunisticReq.setRelaxLocality(true); - opportunisticReq.setResourceName(resourceName); - return opportunisticReq; - } - - private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) { - DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord - (DistSchedAllocateResponse.class); - distSchedAllocateResponse.setAllocateResponse( - Records.newRecord(AllocateResponse.class)); - distSchedAllocateResponse.setNodesForScheduling(nodes); - return distSchedAllocateResponse; - } - - private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse - allocateResponse, int expectedSize) throws Exception { - Assert.assertEquals(expectedSize, - allocateResponse.getAllocatedContainers().size()); - Map<NodeId, List<ContainerId>> allocs = new HashMap<>(); - for (Container c : allocateResponse.getAllocatedContainers()) { - ContainerTokenIdentifier cTokId = BuilderUtils - .newContainerTokenIdentifier(c.getContainerToken()); - Assert.assertEquals( - c.getNodeId().getHost() + ":" + c.getNodeId().getPort(), - cTokId.getNmHostAddress()); - List<ContainerId> cIds = allocs.get(c.getNodeId()); - if (cIds == null) { - cIds = new ArrayList<>(); - allocs.put(c.getNodeId(), cIds); - } - cIds.add(c.getId()); - } - return allocs; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5766b1d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java new file mode 100644 index 0000000..843ac09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; +import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; + + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The DistributedSchedulingAMService is started instead of the + * ApplicationMasterService if distributed scheduling is enabled for the YARN + * cluster. + * It extends the functionality of the ApplicationMasterService by servicing + * clients (AMs and AMRMProxy request interceptors) that understand the + * DistributedSchedulingProtocol. + */ +public class DistributedSchedulingAMService extends ApplicationMasterService + implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> { + + private static final Log LOG = + LogFactory.getLog(DistributedSchedulingAMService.class); + + private final NodeQueueLoadMonitor nodeMonitor; + + private final ConcurrentHashMap<String, Set<NodeId>> rackToNode = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, Set<NodeId>> hostToNode = + new ConcurrentHashMap<>(); + private final int k; + + public DistributedSchedulingAMService(RMContext rmContext, + YarnScheduler scheduler) { + super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler); + this.k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, + YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeQueueLoadMonitor topKSelector = + new NodeQueueLoadMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; + } + + @Override + public Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running on NMs that DO NOT support + // Dist Scheduling... The server multiplexes both the + // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return super.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return super.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return super.allocate(request); + } + + @Override + public RegisterDistributedSchedulingAMResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse response = + registerApplicationMaster(request); + RegisterDistributedSchedulingAMResponse dsResp = recordFactory + .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); + dsResp.setRegisterResponse(response); + dsResp.setMinContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, + YarnConfiguration. + DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) + ) + ); + dsResp.setMaxContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) + ) + ); + dsResp.setIncrContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, + YarnConfiguration. + DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, + YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) + ) + ); + dsResp.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, + YarnConfiguration. + DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); + dsResp.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + + // Set nodes to be used for scheduling + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + @Override + public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( + DistributedSchedulingAllocateRequest request) + throws YarnException, IOException { + List<Container> distAllocContainers = request.getAllocatedContainers(); + for (Container container : distAllocContainers) { + // Create RMContainer + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, true); + appAttempt.addRMContainer(container.getId(), rmContainer); + rmContainer.handle( + new RMContainerEvent(container.getId(), + RMContainerEventType.LAUNCHED)); + } + AllocateResponse response = allocate(request.getAllocateRequest()); + DistributedSchedulingAllocateResponse dsResp = recordFactory + .newRecordInstance(DistributedSchedulingAllocateResponse.class); + dsResp.setAllocateResponse(response); + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + mapping.putIfAbsent(rackName, new HashSet<NodeId>()); + Set<NodeId> nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.add(nodeId); + } + } + } + + private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + Set<NodeId> nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.remove(nodeId); + } + } + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingAMService: " + + event.toString()); + } + + } + + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org