YARN-7900. [AMRMProxy] AMRMClientRelayer for stateful FederationInterceptor. (Botong Huang via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3159bffc Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3159bffc Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3159bffc Branch: refs/heads/trunk Commit: 3159bffce23abf35754da2d7d51de7d8c2631ae3 Parents: f749517 Author: Arun Suresh <asur...@apache.org> Authored: Thu May 17 20:00:52 2018 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Thu May 17 20:00:52 2018 -0700 ---------------------------------------------------------------------- .../yarn/client/api/impl/AMRMClientImpl.java | 151 ++------ .../hadoop/yarn/client/AMRMClientUtils.java | 262 +++++++++++++ .../hadoop/yarn/server/AMRMClientRelayer.java | 364 +++++++++++++++++++ .../failover/FederationProxyProviderUtil.java | 2 +- .../apache/hadoop/yarn/server/package-info.java | 18 + .../server/scheduler/ResourceRequestSet.java | 206 +++++++++++ .../server/scheduler/ResourceRequestSetKey.java | 133 +++++++ .../server/scheduler/SchedulerRequestKey.java | 4 +- .../yarn/server/uam/UnmanagedAMPoolManager.java | 2 +- .../server/uam/UnmanagedApplicationManager.java | 2 +- .../yarn/server/utils/AMRMClientUtils.java | 191 ---------- .../yarn/server/MockResourceManagerFacade.java | 2 +- .../yarn/server/TestAMRMClientRelayer.java | 275 ++++++++++++++ .../amrmproxy/FederationInterceptor.java | 2 +- .../ApplicationMasterService.java | 2 +- .../TestApplicationMasterLauncher.java | 2 +- 16 files changed, 1299 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index ef849b2..36c3cf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -31,11 +31,9 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.AbstractMap.SimpleEntry; -import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -68,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -113,13 +112,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { protected final Set<String> blacklistRemovals = new HashSet<String>(); private Map<Set<String>, PlacementConstraint> placementConstraints = new HashMap<>(); - private Queue<Collection<SchedulingRequest>> batchedSchedulingRequests = - new LinkedList<>(); - private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests = - new ConcurrentHashMap<>(); protected Map<String, Resource> resourceProfilesMap; - + static class ResourceRequestInfo<T> { ResourceRequest remoteRequest; LinkedHashSet<T> containerRequests; @@ -168,6 +163,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { SimpleEntry<Container, UpdateContainerRequest>> pendingChange = new HashMap<>(); + private List<SchedulingRequest> schedulingRequests = new ArrayList<>(); + private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests = + new HashMap<>(); + public AMRMClientImpl() { super(AMRMClientImpl.class.getName()); } @@ -252,18 +251,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { this.resourceProfilesMap = response.getResourceProfiles(); List<Container> prevContainers = response.getContainersFromPreviousAttempts(); - removeFromOutstandingSchedulingRequests(prevContainers); - recreateSchedulingRequestBatch(); + AMRMClientUtils.removeFromOutstandingSchedulingRequests(prevContainers, + this.outstandingSchedRequests); } return response; } @Override - public void addSchedulingRequests( - Collection<SchedulingRequest> schedulingRequests) { - synchronized (this.batchedSchedulingRequests) { - this.batchedSchedulingRequests.add(schedulingRequests); - } + public synchronized void addSchedulingRequests( + Collection<SchedulingRequest> newSchedulingRequests) { + this.schedulingRequests.addAll(newSchedulingRequests); + AMRMClientUtils.addToOutstandingSchedulingRequests(newSchedulingRequests, + this.outstandingSchedRequests); } @Override @@ -279,6 +278,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { List<String> blacklistToRemove = new ArrayList<String>(); Map<ContainerId, SimpleEntry<Container, UpdateContainerRequest>> oldChange = new HashMap<>(); + List<SchedulingRequest> schedulingRequestList = new LinkedList<>(); + try { synchronized (this) { askList = cloneAsks(); @@ -286,10 +287,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { oldChange.putAll(change); List<UpdateContainerRequest> updateList = createUpdateList(); releaseList = new ArrayList<ContainerId>(release); + schedulingRequestList = new ArrayList<>(schedulingRequests); + // optimistically clear this collection assuming no RPC failure ask.clear(); release.clear(); change.clear(); + schedulingRequests.clear(); blacklistToAdd.addAll(blacklistAdditions); blacklistToRemove.addAll(blacklistRemovals); @@ -301,8 +305,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { allocateRequest = AllocateRequest.newBuilder() .responseId(lastResponseId).progress(progressIndicator) .askList(askList).resourceBlacklistRequest(blacklistRequest) - .releaseList(releaseList).updateRequests(updateList).build(); - populateSchedulingRequests(allocateRequest); + .releaseList(releaseList).updateRequests(updateList) + .schedulingRequests(schedulingRequestList).build(); + // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -311,10 +316,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { try { allocateResponse = rmClient.allocate(allocateRequest); - removeFromOutstandingSchedulingRequests( - allocateResponse.getAllocatedContainers()); - removeFromOutstandingSchedulingRequests( - allocateResponse.getContainersFromPreviousAttempts()); } catch (ApplicationMasterNotRegisteredException e) { LOG.warn("ApplicationMaster is out of sync with ResourceManager," + " hence resyncing."); @@ -331,6 +332,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } } change.putAll(this.pendingChange); + for (List<SchedulingRequest> schedReqs : + this.outstandingSchedRequests.values()) { + this.schedulingRequests.addAll(schedReqs); + } } // re register with RM registerApplicationMaster(); @@ -370,6 +375,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { removePendingChangeRequests(changed); } } + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getAllocatedContainers(), + this.outstandingSchedRequests); + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts(), + this.outstandingSchedRequests); } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -410,108 +421,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } blacklistAdditions.addAll(blacklistToAdd); blacklistRemovals.addAll(blacklistToRemove); - } - } - } - return allocateResponse; - } - private void populateSchedulingRequests(AllocateRequest allocateRequest) { - synchronized (this.batchedSchedulingRequests) { - if (!this.batchedSchedulingRequests.isEmpty()) { - List<SchedulingRequest> newReqs = new LinkedList<>(); - Iterator<Collection<SchedulingRequest>> iter = - this.batchedSchedulingRequests.iterator(); - while (iter.hasNext()) { - Collection<SchedulingRequest> requests = iter.next(); - newReqs.addAll(requests); - addToOutstandingSchedulingRequests(requests); - iter.remove(); - } - allocateRequest.setSchedulingRequests(newReqs); - } - } - } - - private void recreateSchedulingRequestBatch() { - List<SchedulingRequest> batched = new ArrayList<>(); - synchronized (this.outstandingSchedRequests) { - for (List<SchedulingRequest> schedReqs : - this.outstandingSchedRequests.values()) { - batched.addAll(schedReqs); - } - } - synchronized (this.batchedSchedulingRequests) { - this.batchedSchedulingRequests.add(batched); - } - } - - private void addToOutstandingSchedulingRequests( - Collection<SchedulingRequest> requests) { - for (SchedulingRequest req : requests) { - List<SchedulingRequest> schedulingRequests = - this.outstandingSchedRequests.computeIfAbsent( - req.getAllocationTags(), x -> new LinkedList<>()); - SchedulingRequest matchingReq = null; - synchronized (schedulingRequests) { - for (SchedulingRequest schedReq : schedulingRequests) { - if (isMatching(req, schedReq)) { - matchingReq = schedReq; - break; - } - } - if (matchingReq != null) { - matchingReq.getResourceSizing().setNumAllocations( - req.getResourceSizing().getNumAllocations()); - } else { - schedulingRequests.add(req); - } - } - } - } - - private boolean isMatching(SchedulingRequest schedReq1, - SchedulingRequest schedReq2) { - return schedReq1.getPriority().equals(schedReq2.getPriority()) && - schedReq1.getExecutionType().getExecutionType().equals( - schedReq1.getExecutionType().getExecutionType()) && - schedReq1.getAllocationRequestId() == - schedReq2.getAllocationRequestId(); - } - - private void removeFromOutstandingSchedulingRequests( - Collection<Container> containers) { - if (containers == null || containers.isEmpty()) { - return; - } - for (Container container : containers) { - if (container.getAllocationTags() != null && - !container.getAllocationTags().isEmpty()) { - List<SchedulingRequest> schedReqs = - this.outstandingSchedRequests.get(container.getAllocationTags()); - if (schedReqs != null && !schedReqs.isEmpty()) { - synchronized (schedReqs) { - Iterator<SchedulingRequest> iter = schedReqs.iterator(); - while (iter.hasNext()) { - SchedulingRequest schedReq = iter.next(); - if (schedReq.getPriority().equals(container.getPriority()) && - schedReq.getAllocationRequestId() == - container.getAllocationRequestId()) { - int numAllocations = - schedReq.getResourceSizing().getNumAllocations(); - numAllocations--; - if (numAllocations == 0) { - iter.remove(); - } else { - schedReq.getResourceSizing() - .setNumAllocations(numAllocations); - } - } - } - } + schedulingRequests.addAll(schedulingRequestList); } } } + return allocateResponse; } private List<UpdateContainerRequest> createUpdateList() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java new file mode 100644 index 0000000..387e399 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for AMRMClient. + */ +@Private +public final class AMRMClientUtils { + private static final Logger LOG = + LoggerFactory.getLogger(AMRMClientUtils.class); + + public static final String APP_ALREADY_REGISTERED_MESSAGE = + "Application Master is already registered : "; + + private AMRMClientUtils() { + } + + /** + * Handle ApplicationNotRegistered exception and re-register. + * + * @param appId application Id + * @param rmProxy RM proxy instance + * @param registerRequest the AM re-register request + * @throws YarnException if re-register fails + */ + public static void handleNotRegisteredExceptionAndReRegister( + ApplicationId appId, ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest) throws YarnException { + LOG.info("App attempt {} not registered, most likely due to RM failover. " + + " Trying to re-register.", appId); + try { + rmProxy.registerApplicationMaster(registerRequest); + } catch (Exception e) { + if (e instanceof InvalidApplicationMasterRequestException + && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) { + LOG.info("Concurrent thread successfully registered, moving on."); + } else { + LOG.error("Error trying to re-register AM", e); + throw new YarnException(e); + } + } + } + + /** + * Helper method for client calling ApplicationMasterProtocol.allocate that + * handles re-register if RM fails over. + * + * @param request allocate request + * @param rmProxy RM proxy + * @param registerRequest the register request for re-register + * @param appId application id + * @return allocate response + * @throws YarnException if RM call fails + * @throws IOException if RM call fails + */ + public static AllocateResponse allocateWithReRegister(AllocateRequest request, + ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { + try { + return rmProxy.allocate(request); + } catch (ApplicationMasterNotRegisteredException e) { + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, + registerRequest); + // reset responseId after re-register + request.setResponseId(0); + // retry allocate + return allocateWithReRegister(request, rmProxy, registerRequest, appId); + } + } + + /** + * Helper method for client calling + * ApplicationMasterProtocol.finishApplicationMaster that handles re-register + * if RM fails over. + * + * @param request finishApplicationMaster request + * @param rmProxy RM proxy + * @param registerRequest the register request for re-register + * @param appId application id + * @return finishApplicationMaster response + * @throws YarnException if RM call fails + * @throws IOException if RM call fails + */ + public static FinishApplicationMasterResponse finishAMWithReRegister( + FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { + try { + return rmProxy.finishApplicationMaster(request); + } catch (ApplicationMasterNotRegisteredException ex) { + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, + registerRequest); + // retry finishAM after re-register + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); + } + } + + /** + * Create a proxy for the specified protocol. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param user the user on whose behalf the proxy is being created + * @param token the auth token to use for connection + * @param <T> Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + public static <T> T createRMProxy(final Configuration configuration, + final Class<T> protocol, UserGroupInformation user, + final Token<? extends TokenIdentifier> token) throws IOException { + try { + String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + LOG.info("Creating RMProxy to RM {} for protocol {} for user {}", + rmClusterId, protocol.getSimpleName(), user); + if (token != null) { + // preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + // Same as YarnServerSecurityUtils.updateAMRMToken() + user.addToken(token); + token.setService(ClientRMProxy.getAMRMTokenService(configuration)); + setAuthModeInConf(configuration); + } + final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() { + @Override + public T run() throws Exception { + return ClientRMProxy.createRMProxy(configuration, protocol); + } + }); + return proxyConnection; + + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + private static void setAuthModeInConf(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + } + + public static void addToOutstandingSchedulingRequests( + Collection<SchedulingRequest> requests, + Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) { + for (SchedulingRequest req : requests) { + List<SchedulingRequest> schedulingRequests = outstandingSchedRequests + .computeIfAbsent(req.getAllocationTags(), x -> new LinkedList<>()); + SchedulingRequest matchingReq = null; + for (SchedulingRequest schedReq : schedulingRequests) { + if (isMatchingSchedulingRequests(req, schedReq)) { + matchingReq = schedReq; + break; + } + } + if (matchingReq != null) { + matchingReq.getResourceSizing() + .setNumAllocations(req.getResourceSizing().getNumAllocations()); + } else { + schedulingRequests.add(req); + } + } + } + + public static boolean isMatchingSchedulingRequests( + SchedulingRequest schedReq1, SchedulingRequest schedReq2) { + return schedReq1.getPriority().equals(schedReq2.getPriority()) && + schedReq1.getExecutionType().getExecutionType().equals( + schedReq1.getExecutionType().getExecutionType()) && + schedReq1.getAllocationRequestId() == + schedReq2.getAllocationRequestId(); + } + + public static void removeFromOutstandingSchedulingRequests( + Collection<Container> containers, + Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) { + if (containers == null || containers.isEmpty()) { + return; + } + for (Container container : containers) { + if (container.getAllocationTags() != null + && !container.getAllocationTags().isEmpty()) { + List<SchedulingRequest> schedReqs = + outstandingSchedRequests.get(container.getAllocationTags()); + if (schedReqs != null && !schedReqs.isEmpty()) { + Iterator<SchedulingRequest> iter = schedReqs.iterator(); + while (iter.hasNext()) { + SchedulingRequest schedReq = iter.next(); + if (schedReq.getPriority().equals(container.getPriority()) + && schedReq.getAllocationRequestId() == container + .getAllocationRequestId()) { + int numAllocations = + schedReq.getResourceSizing().getNumAllocations(); + numAllocations--; + if (numAllocations == 0) { + iter.remove(); + } else { + schedReq.getResourceSizing().setNumAllocations(numAllocations); + } + } + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java new file mode 100644 index 0000000..c216ace --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -0,0 +1,364 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.client.AMRMClientUtils; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A component that sits in between AMRMClient(Impl) and Yarn RM. It remembers + * pending requests similar to AMRMClient, and handles RM re-sync automatically + * without propagate the re-sync exception back to AMRMClient. + */ +public class AMRMClientRelayer extends AbstractService + implements ApplicationMasterProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(AMRMClientRelayer.class); + + private ApplicationMasterProtocol rmClient; + + /** + * The original registration request that was sent by the AM. This instance is + * reused to register/re-register with all the sub-cluster RMs. + */ + private RegisterApplicationMasterRequest amRegistrationRequest; + + /** + * Similar to AMRMClientImpl, all data structures below have two versions: + * + * The remote ones are all the pending requests that RM has not fulfill yet. + * Whenever RM fails over, we re-register and then full re-send all these + * pending requests. + * + * The non-remote ones are the requests that RM has not received yet. When RM + * throws non-fail-over exception back, the request is considered not received + * by RM. We will merge with new requests and re-send in the next heart beat. + */ + private Map<ResourceRequestSetKey, ResourceRequestSet> remotePendingAsks = + new HashMap<>(); + /** + * Same as AMRMClientImpl, we need to use a custom comparator that does not + * look at ResourceRequest.getNumContainers() here. TreeSet allows a custom + * comparator. + */ + private Set<ResourceRequest> ask = + new TreeSet<>(new ResourceRequest.ResourceRequestComparator()); + + private Set<ContainerId> remotePendingRelease = new HashSet<>(); + private Set<ContainerId> release = new HashSet<>(); + + private Set<String> remoteBlacklistedNodes = new HashSet<>(); + private Set<String> blacklistAdditions = new HashSet<>(); + private Set<String> blacklistRemovals = new HashSet<>(); + + private Map<ContainerId, UpdateContainerRequest> remotePendingChange = + new HashMap<>(); + private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>(); + + private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest = + new HashMap<>(); + private List<SchedulingRequest> schedulingRequest = new ArrayList<>(); + + public AMRMClientRelayer() { + super(AMRMClientRelayer.class.getName()); + } + + public AMRMClientRelayer(ApplicationMasterProtocol rmClient) { + this(); + this.rmClient = rmClient; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(getConfig()); + try { + if (this.rmClient == null) { + this.rmClient = + ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); + } + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.rmClient != null) { + RPC.stopProxy(this.rmClient); + } + super.serviceStop(); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + this.amRegistrationRequest = request; + return this.rmClient.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + try { + return this.rmClient.finishApplicationMaster(request); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.warn("Out of sync with ResourceManager, hence resyncing."); + // re register with RM + registerApplicationMaster(this.amRegistrationRequest); + return finishApplicationMaster(request); + } + } + + @Override + public AllocateResponse allocate(AllocateRequest allocateRequest) + throws YarnException, IOException { + AllocateResponse allocateResponse = null; + try { + synchronized (this) { + // update the data structures first + addNewAsks(allocateRequest.getAskList()); + + if (allocateRequest.getReleaseList() != null) { + this.remotePendingRelease.addAll(allocateRequest.getReleaseList()); + this.release.addAll(allocateRequest.getReleaseList()); + } + + if (allocateRequest.getResourceBlacklistRequest() != null) { + if (allocateRequest.getResourceBlacklistRequest() + .getBlacklistAdditions() != null) { + this.remoteBlacklistedNodes.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistAdditions()); + this.blacklistAdditions.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistAdditions()); + } + if (allocateRequest.getResourceBlacklistRequest() + .getBlacklistRemovals() != null) { + this.remoteBlacklistedNodes.removeAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistRemovals()); + this.blacklistRemovals.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistRemovals()); + } + } + + if (allocateRequest.getUpdateRequests() != null) { + for (UpdateContainerRequest update : allocateRequest + .getUpdateRequests()) { + this.remotePendingChange.put(update.getContainerId(), update); + this.change.put(update.getContainerId(), update); + } + } + + if (allocateRequest.getSchedulingRequests() != null) { + AMRMClientUtils.addToOutstandingSchedulingRequests( + allocateRequest.getSchedulingRequests(), + this.remotePendingSchedRequest); + this.schedulingRequest + .addAll(allocateRequest.getSchedulingRequests()); + } + + ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size()); + for (ResourceRequest r : ask) { + // create a copy of ResourceRequest as we might change it while the + // RPC layer is using it to send info across + askList.add(ResourceRequest.newBuilder().priority(r.getPriority()) + .resourceName(r.getResourceName()).capability(r.getCapability()) + .numContainers(r.getNumContainers()) + .relaxLocality(r.getRelaxLocality()) + .nodeLabelExpression(r.getNodeLabelExpression()) + .executionTypeRequest(r.getExecutionTypeRequest()) + .allocationRequestId(r.getAllocationRequestId()).build()); + } + + allocateRequest = AllocateRequest.newBuilder() + .responseId(allocateRequest.getResponseId()) + .progress(allocateRequest.getProgress()).askList(askList) + .releaseList(new ArrayList<>(this.release)) + .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance( + new ArrayList<>(this.blacklistAdditions), + new ArrayList<>(this.blacklistRemovals))) + .updateRequests(new ArrayList<>(this.change.values())) + .schedulingRequests(new ArrayList<>(this.schedulingRequest)) + .build(); + } + + // Do the actual allocate call + try { + allocateResponse = this.rmClient.allocate(allocateRequest); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + + synchronized (this) { + // Add all remotePending data into to-send data structures + for (ResourceRequestSet requestSet : this.remotePendingAsks + .values()) { + for (ResourceRequest rr : requestSet.getRRs()) { + addResourceRequestToAsk(rr); + } + } + this.release.addAll(this.remotePendingRelease); + this.blacklistAdditions.addAll(this.remoteBlacklistedNodes); + this.change.putAll(this.remotePendingChange); + for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest + .values()) { + this.schedulingRequest.addAll(reqs); + } + } + + // re register with RM, then retry allocate recursively + registerApplicationMaster(this.amRegistrationRequest); + return allocate(allocateRequest); + } + + synchronized (this) { + // Process the allocate response from RM + if (allocateResponse.getCompletedContainersStatuses() != null) { + for (ContainerStatus container : allocateResponse + .getCompletedContainersStatuses()) { + this.remotePendingRelease.remove(container.getContainerId()); + this.remotePendingChange.remove(container.getContainerId()); + } + } + + if (allocateResponse.getUpdatedContainers() != null) { + for (UpdatedContainer updatedContainer : allocateResponse + .getUpdatedContainers()) { + this.remotePendingChange + .remove(updatedContainer.getContainer().getId()); + } + } + + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getAllocatedContainers(), + this.remotePendingSchedRequest); + AMRMClientUtils.removeFromOutstandingSchedulingRequests( + allocateResponse.getContainersFromPreviousAttempts(), + this.remotePendingSchedRequest); + } + + } finally { + synchronized (this) { + /* + * If allocateResponse is null, it means exception happened and RM did + * not accept the request. Don't clear any data structures so that they + * will be re-sent next time. + * + * Otherwise request was accepted by RM, we are safe to clear these. + */ + if (allocateResponse != null) { + this.ask.clear(); + this.release.clear(); + + this.blacklistAdditions.clear(); + this.blacklistRemovals.clear(); + + this.change.clear(); + this.schedulingRequest.clear(); + } + } + } + return allocateResponse; + } + + private void addNewAsks(List<ResourceRequest> asks) throws YarnException { + Set<ResourceRequestSetKey> touchedKeys = new HashSet<>(); + for (ResourceRequest rr : asks) { + addResourceRequestToAsk(rr); + + ResourceRequestSetKey key = new ResourceRequestSetKey(rr); + touchedKeys.add(key); + + ResourceRequestSet askSet = this.remotePendingAsks.get(key); + if (askSet == null) { + askSet = new ResourceRequestSet(key); + this.remotePendingAsks.put(key, askSet); + } + askSet.addAndOverrideRR(rr); + } + + // Cleanup properly if needed + for (ResourceRequestSetKey key : touchedKeys) { + ResourceRequestSet askSet = this.remotePendingAsks.get(key); + if (askSet.getNumContainers() == 0) { + this.remotePendingAsks.remove(key); + } else { + // Remove non-any zero RRs + askSet.cleanupZeroNonAnyRR(); + } + } + } + + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { + // The ResourceRequestComparator doesn't look at container count when + // comparing. So we need to make sure the new RR override the old if any + this.ask.remove(remoteRequest); + this.ask.add(remoteRequest); + } + + @VisibleForTesting + protected Map<ResourceRequestSetKey, ResourceRequestSet> + getRemotePendingAsks() { + return this.remotePendingAsks; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java index 3931f2b..91924da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java @@ -27,12 +27,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java new file mode 100644 index 0000000..6289500 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java new file mode 100644 index 0000000..b1e6b6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.scheduler; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A set of resource requests of the same scheduler key + * {@link ResourceRequestSetKey}. + */ +public class ResourceRequestSet { + + private ResourceRequestSetKey key; + private int numContainers; + // ResourceName -> RR + private Map<String, ResourceRequest> asks; + + /** + * Create a empty set with given key. + * + * @param key the key of the request set + * @throws YarnException if fails + */ + public ResourceRequestSet(ResourceRequestSetKey key) throws YarnException { + this.key = key; + // leave it zero for now, as if it is a cancel + this.numContainers = 0; + this.asks = new HashMap<>(); + } + + /** + * Create a shallow copy of the request set. + * + * @param other the set of copy from + */ + public ResourceRequestSet(ResourceRequestSet other) { + this.key = other.key; + this.numContainers = other.numContainers; + this.asks = new HashMap<>(); + // The assumption is that the RR objects should not be modified without + // making a copy + this.asks.putAll(other.asks); + } + + /** + * Add a {@link ResourceRequest} into the requestSet. If there's already an RR + * with the same resource name, override it and update accordingly. + * + * @param ask the new {@link ResourceRequest} + * @throws YarnException + */ + public void addAndOverrideRR(ResourceRequest ask) throws YarnException { + if (!this.key.equals(new ResourceRequestSetKey(ask))) { + throw new YarnException( + "None compatible asks: \n" + ask + "\n" + this.key); + } + + // Override directly if exists + this.asks.put(ask.getResourceName(), ask); + + if (this.key.getExeType().equals(ExecutionType.GUARANTEED)) { + // For G requestSet, update the numContainers only for ANY RR + if (ask.getResourceName().equals(ResourceRequest.ANY)) { + this.numContainers = ask.getNumContainers(); + } + } else { + // The assumption we made about O asks is that all RR in a requestSet has + // the same numContainers value. So we just take the value of the last RR + this.numContainers = ask.getNumContainers(); + } + if (this.numContainers < 0) { + throw new YarnException("numContainers becomes " + this.numContainers + + " when adding ask " + ask + "\n requestSet: " + toString()); + } + } + + /** + * Merge a requestSet into this one. + * + * @param requestSet the requestSet to merge + * @throws YarnException + */ + public void addAndOverrideRRSet(ResourceRequestSet requestSet) + throws YarnException { + if (requestSet == null) { + return; + } + for (ResourceRequest rr : requestSet.getRRs()) { + addAndOverrideRR(rr); + } + } + + /** + * Remove all non-Any ResourceRequests from the set. This is necessary cleanup + * to avoid requestSet getting too big. + */ + public void cleanupZeroNonAnyRR() { + Iterator<Entry<String, ResourceRequest>> iter = + this.asks.entrySet().iterator(); + while (iter.hasNext()) { + Entry<String, ResourceRequest> entry = iter.next(); + if (entry.getKey().equals(ResourceRequest.ANY)) { + // Do not delete ANY RR + continue; + } + if (entry.getValue().getNumContainers() == 0) { + iter.remove(); + } + } + } + + public Map<String, ResourceRequest> getAsks() { + return this.asks; + } + + public Collection<ResourceRequest> getRRs() { + return this.asks.values(); + } + + public int getNumContainers() { + return this.numContainers; + } + + /** + * Force set the # of containers to ask for this requestSet to a given value. + * + * @param newValue the new # of containers value + * @throws YarnException + */ + public void setNumContainers(int newValue) throws YarnException { + if (this.numContainers == 0) { + throw new YarnException("should not set numContainers to " + newValue + + " for a cancel requestSet: " + toString()); + } + + // Clone the ResourceRequest object whenever we need to change it + int oldValue = this.numContainers; + this.numContainers = newValue; + if (this.key.getExeType().equals(ExecutionType.OPPORTUNISTIC)) { + // The assumption we made about O asks is that all RR in a requestSet has + // the same numContainers value + Map<String, ResourceRequest> newAsks = new HashMap<>(); + for (ResourceRequest rr : this.asks.values()) { + ResourceRequest clone = cloneResourceRequest(rr); + clone.setNumContainers(newValue); + newAsks.put(clone.getResourceName(), clone); + } + this.asks = newAsks; + } else { + ResourceRequest rr = this.asks.get(ResourceRequest.ANY); + if (rr == null) { + throw new YarnException( + "No ANY RR found in requestSet with numContainers=" + oldValue); + } + ResourceRequest clone = cloneResourceRequest(rr); + clone.setNumContainers(newValue); + this.asks.put(ResourceRequest.ANY, clone); + } + } + + private ResourceRequest cloneResourceRequest(ResourceRequest rr) { + return ResourceRequest.newBuilder().priority(rr.getPriority()) + .resourceName(rr.getResourceName()).capability(rr.getCapability()) + .numContainers(rr.getNumContainers()) + .relaxLocality(rr.getRelaxLocality()) + .nodeLabelExpression(rr.getNodeLabelExpression()) + .executionTypeRequest(rr.getExecutionTypeRequest()) + .allocationRequestId(rr.getAllocationRequestId()).build(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("{" + this.key.toString()); + for (Entry<String, ResourceRequest> entry : this.asks.entrySet()) { + builder.append( + " " + entry.getValue().getNumContainers() + ":" + entry.getKey()); + } + builder.append("}"); + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java new file mode 100644 index 0000000..4db88ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java @@ -0,0 +1,133 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * The scheduler key for a group of {@link ResourceRequest}. + * + * TODO: after YARN-7631 is fixed by adding Resource and ExecType into + * SchedulerRequestKey, then we can directly use that. + */ +public class ResourceRequestSetKey extends SchedulerRequestKey { + + // More ResourceRequest key fields on top of SchedulerRequestKey + private final Resource resource; + private final ExecutionType execType; + + /** + * Create the key object from a {@link ResourceRequest}. + * + * @param rr Resource request object + * @throws YarnException if fails + */ + public ResourceRequestSetKey(ResourceRequest rr) throws YarnException { + this(rr.getAllocationRequestId(), rr.getPriority(), rr.getCapability(), + ((rr.getExecutionTypeRequest() == null) ? ExecutionType.GUARANTEED + : rr.getExecutionTypeRequest().getExecutionType())); + if (rr.getPriority() == null) { + throw new YarnException("Null priority in RR: " + rr); + } + if (rr.getCapability() == null) { + throw new YarnException("Null resource in RR: " + rr); + } + } + + /** + * Create the key object from member objects. + * + * @param allocationRequestId allocate request id of the ask + * @param priority the priority of the ask + * @param resource the resource size of the ask + * @param execType the execution type of the ask + */ + public ResourceRequestSetKey(long allocationRequestId, Priority priority, + Resource resource, ExecutionType execType) { + super(priority, allocationRequestId, null); + + if (resource == null) { + this.resource = Resource.newInstance(0, 0); + } else { + this.resource = resource; + } + if (execType == null) { + this.execType = ExecutionType.GUARANTEED; + } else { + this.execType = execType; + } + } + + public Resource getResource() { + return this.resource; + } + + public ExecutionType getExeType() { + return this.execType; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof SchedulerRequestKey)) { + return false; + } + if (!(obj instanceof ResourceRequestSetKey)) { + return super.equals(obj); + } + ResourceRequestSetKey other = (ResourceRequestSetKey) obj; + return super.equals(other) && this.resource.equals(other.resource) + && this.execType.equals(other.execType); + } + + @Override + public int hashCode() { + return ((super.hashCode() * 37 + this.resource.hashCode()) * 41) + + this.execType.hashCode(); + } + + @Override + public int compareTo(SchedulerRequestKey other) { + int ret = super.compareTo(other); + if (ret != 0) { + return ret; + } + if (!(other instanceof ResourceRequestSetKey)) { + return ret; + } + + ResourceRequestSetKey otherKey = (ResourceRequestSetKey) other; + ret = this.resource.compareTo(otherKey.resource); + if (ret != 0) { + return ret; + } + return this.execType.compareTo(otherKey.execType); + } + + @Override + public String toString() { + return "[id:" + getAllocationRequestId() + " p:" + + getPriority().getPriority() + + (this.execType.equals(ExecutionType.GUARANTEED) ? " G" + : " O" + " r:" + this.resource + "]"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 0fce083..c3b08d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; * Composite key for outstanding scheduler requests for any schedulable entity. * Currently it includes {@link Priority}. */ -public final class SchedulerRequestKey implements +public class SchedulerRequestKey implements Comparable<SchedulerRequestKey> { private final Priority priority; @@ -73,8 +73,6 @@ public final class SchedulerRequestKey implements container.getAllocationRequestId(), null); } - - public SchedulerRequestKey(Priority priority, long allocationRequestId, ContainerId containerToUpdate) { this.priority = priority; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 677c4e6..02eef29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -47,9 +47,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 3f4a110..10985e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -56,13 +56,13 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java deleted file mode 100644 index 37e2b5e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.utils; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.ClientRMProxy; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility class for AMRMClient. - */ -@Private -public final class AMRMClientUtils { - private static final Logger LOG = - LoggerFactory.getLogger(AMRMClientUtils.class); - - public static final String APP_ALREADY_REGISTERED_MESSAGE = - "Application Master is already registered : "; - - private AMRMClientUtils() { - } - - /** - * Handle ApplicationNotRegistered exception and re-register. - * - * @param appId application Id - * @param rmProxy RM proxy instance - * @param registerRequest the AM re-register request - * @throws YarnException if re-register fails - */ - public static void handleNotRegisteredExceptionAndReRegister( - ApplicationId appId, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest) throws YarnException { - LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", appId); - try { - rmProxy.registerApplicationMaster(registerRequest); - } catch (Exception e) { - if (e instanceof InvalidApplicationMasterRequestException - && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) { - LOG.info("Concurrent thread successfully registered, moving on."); - } else { - LOG.error("Error trying to re-register AM", e); - throw new YarnException(e); - } - } - } - - /** - * Helper method for client calling ApplicationMasterProtocol.allocate that - * handles re-register if RM fails over. - * - * @param request allocate request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return allocate response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static AllocateResponse allocateWithReRegister(AllocateRequest request, - ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.allocate(request); - } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // reset responseId after re-register - request.setResponseId(0); - // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, appId); - } - } - - /** - * Helper method for client calling - * ApplicationMasterProtocol.finishApplicationMaster that handles re-register - * if RM fails over. - * - * @param request finishApplicationMaster request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return finishApplicationMaster response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static FinishApplicationMasterResponse finishAMWithReRegister( - FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.finishApplicationMaster(request); - } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, appId); - } - } - - /** - * Create a proxy for the specified protocol. - * - * @param configuration Configuration to generate {@link ClientRMProxy} - * @param protocol Protocol for the proxy - * @param user the user on whose behalf the proxy is being created - * @param token the auth token to use for connection - * @param <T> Type information of the proxy - * @return Proxy to the RM - * @throws IOException on failure - */ - @Public - @Unstable - public static <T> T createRMProxy(final Configuration configuration, - final Class<T> protocol, UserGroupInformation user, - final Token<? extends TokenIdentifier> token) throws IOException { - try { - String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID, - YarnConfiguration.DEFAULT_RM_CLUSTER_ID); - LOG.info("Creating RMProxy to RM {} for protocol {} for user {}", - rmClusterId, protocol.getSimpleName(), user); - if (token != null) { - // preserve the token service sent by the RM when adding the token - // to ensure we replace the previous token setup by the RM. - // Afterwards we can update the service address for the RPC layer. - // Same as YarnServerSecurityUtils.updateAMRMToken() - user.addToken(token); - token.setService(ClientRMProxy.getAMRMTokenService(configuration)); - setAuthModeInConf(configuration); - } - final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() { - @Override - public T run() throws Exception { - return ClientRMProxy.createRMProxy(configuration, protocol); - } - }); - return proxyConnection; - - } catch (InterruptedException e) { - throw new YarnRuntimeException(e); - } - } - - private static void setAuthModeInConf(Configuration conf) { - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - SaslRpcServer.AuthMethod.TOKEN.toString()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 15e1cea..23cd3e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -126,6 +126,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; @@ -158,7 +159,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java new file mode 100644 index 0000000..22bb1f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -0,0 +1,275 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for AMRMClientRelayer. + */ +public class TestAMRMClientRelayer { + + /** + * Mocked ApplicationMasterService in RM. + */ + public static class MockApplicationMasterService + implements ApplicationMasterProtocol { + + // Whether this mockRM will throw failover exception upon next heartbeat + // from AM + private boolean failover = false; + private List<ResourceRequest> lastAsk; + private List<ContainerId> lastRelease; + private List<String> lastBlacklistAdditions; + private List<String> lastBlacklistRemovals; + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + return null; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + if (this.failover) { + this.failover = false; + throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); + } + return null; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (this.failover) { + this.failover = false; + throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); + } + this.lastAsk = request.getAskList(); + this.lastRelease = request.getReleaseList(); + this.lastBlacklistAdditions = + request.getResourceBlacklistRequest().getBlacklistAdditions(); + this.lastBlacklistRemovals = + request.getResourceBlacklistRequest().getBlacklistRemovals(); + return AllocateResponse.newInstance(0, null, null, + new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, 0, + null, null); + } + + public void setFailoverFlag() { + this.failover = true; + } + } + + private Configuration conf; + private MockApplicationMasterService mockAMS; + private AMRMClientRelayer relayer; + + // Buffer of asks that will be sent to RM in the next AM heartbeat + private List<ResourceRequest> asks = new ArrayList<>(); + private List<ContainerId> releases = new ArrayList<>(); + private List<String> blacklistAdditions = new ArrayList<>(); + private List<String> blacklistRemoval = new ArrayList<>(); + + @Before + public void setup() throws YarnException, IOException { + this.conf = new Configuration(); + + this.mockAMS = new MockApplicationMasterService(); + this.relayer = new AMRMClientRelayer(this.mockAMS); + + this.relayer.init(conf); + this.relayer.start(); + + this.relayer.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance("", 0, "")); + + clearAllocateRequestLists(); + } + + private void assertAsksAndReleases(int expectedAsk, int expectedRelease) { + Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size()); + Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size()); + } + + private void assertBlacklistAdditionsAndRemovals(int expectedAdditions, + int expectedRemovals) { + Assert.assertEquals(expectedAdditions, + this.mockAMS.lastBlacklistAdditions.size()); + Assert.assertEquals(expectedRemovals, + this.mockAMS.lastBlacklistRemovals.size()); + } + + private AllocateRequest getAllocateRequest() { + // Need to create a new one every time because rather than directly + // referring the lists, the protobuf impl makes a copy of the lists + return AllocateRequest.newInstance(0, 0, asks, releases, + ResourceBlacklistRequest.newInstance(blacklistAdditions, + blacklistRemoval)); + } + + private void clearAllocateRequestLists() { + this.asks.clear(); + this.releases.clear(); + this.blacklistAdditions.clear(); + this.blacklistRemoval.clear(); + } + + private static ContainerId createContainerId(int id) { + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), + id); + } + + protected ResourceRequest createResourceRequest(long id, String resource, + int memory, int vCores, int priority, ExecutionType execType, + int containers) { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setAllocationRequestId(id); + req.setResourceName(resource); + req.setCapability(Resource.newInstance(memory, vCores)); + req.setPriority(Priority.newInstance(priority)); + req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType)); + req.setNumContainers(containers); + return req; + } + + /** + * Test the proper handling of removal/cancel of resource requests. + */ + @Test + public void testResourceRequestCleanup() throws YarnException, IOException { + // Ask for two containers, one with location preference + this.asks.add(createResourceRequest(0, "node", 2048, 1, 1, + ExecutionType.GUARANTEED, 1)); + this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1, + ExecutionType.GUARANTEED, 1)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 2)); + this.relayer.allocate(getAllocateRequest()); + + assertAsksAndReleases(3, 0); + Assert.assertEquals(1, this.relayer.getRemotePendingAsks().size()); + ResourceRequestSet set = + this.relayer.getRemotePendingAsks().values().iterator().next(); + Assert.assertEquals(3, set.getAsks().size()); + clearAllocateRequestLists(); + + // Cancel one ask + this.asks.add(createResourceRequest(0, "node", 2048, 1, 1, + ExecutionType.GUARANTEED, 0)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 1)); + this.relayer.allocate(getAllocateRequest()); + + assertAsksAndReleases(2, 0); + Assert.assertEquals(1, relayer.getRemotePendingAsks().size()); + set = this.relayer.getRemotePendingAsks().values().iterator().next(); + Assert.assertEquals(2, set.getAsks().size()); + clearAllocateRequestLists(); + + // Cancel the other ask, the pending askSet should be removed + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 0)); + this.relayer.allocate(AllocateRequest.newInstance(0, 0, asks, null, null)); + + assertAsksAndReleases(1, 0); + Assert.assertEquals(0, this.relayer.getRemotePendingAsks().size()); + } + + /** + * Test the full pending resend after RM fails over. + */ + @Test + public void testResendRequestsOnRMRestart() + throws YarnException, IOException { + ContainerId c1 = createContainerId(1); + ContainerId c2 = createContainerId(2); + ContainerId c3 = createContainerId(3); + + // Ask for two containers, one with location preference + this.asks.add(createResourceRequest(0, "node1", 2048, 1, 1, + ExecutionType.GUARANTEED, 1)); + this.asks.add(createResourceRequest(0, "rack", 2048, 1, 1, + ExecutionType.GUARANTEED, 1)); + this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1, + ExecutionType.GUARANTEED, 2)); + + this.releases.add(c1); + this.blacklistAdditions.add("node1"); + this.blacklistRemoval.add("node0"); + + // 1. a fully loaded request + this.relayer.allocate(getAllocateRequest()); + assertAsksAndReleases(3, 1); + assertBlacklistAdditionsAndRemovals(1, 1); + clearAllocateRequestLists(); + + // 2. empty request + this.relayer.allocate(getAllocateRequest()); + assertAsksAndReleases(0, 0); + assertBlacklistAdditionsAndRemovals(0, 0); + clearAllocateRequestLists(); + + // Set RM restart and failover flag + this.mockAMS.setFailoverFlag(); + + // More requests + this.blacklistAdditions.add("node2"); + this.releases.add(c2); + this.relayer.allocate(getAllocateRequest()); + + // verify pending requests are fully re-sent + assertAsksAndReleases(3, 2); + assertBlacklistAdditionsAndRemovals(2, 0); + clearAllocateRequestLists(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 9a53a50..5740749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3159bffc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ae28879..7dac2cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -73,7 +74,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.proces import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; -import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org