Repository: hadoop Updated Branches: refs/heads/branch-2 043b7d133 -> 4cfd248a7
YARN-6777. Support for ApplicationMasterService processing chain of interceptors. (asuresh) (cherry picked from commit 077fcf6a96e420e7f36350931722b8603d010cf1) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cfd248a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cfd248a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cfd248a Branch: refs/heads/branch-2 Commit: 4cfd248a72768a107e1fba5ad44554ce7602b871 Parents: 043b7d1 Author: Arun Suresh <[email protected]> Authored: Mon Jul 17 17:02:22 2017 -0700 Committer: Arun Suresh <[email protected]> Committed: Fri Aug 4 16:50:21 2017 -0700 ---------------------------------------------------------------------- .../ams/ApplicationMasterServiceContext.java | 29 ++++ .../ams/ApplicationMasterServiceProcessor.java | 30 ++-- .../hadoop/yarn/conf/YarnConfiguration.java | 5 +- .../src/main/resources/yarn-default.xml | 10 ++ .../resourcemanager/AMSProcessingChain.java | 102 ++++++++++++ .../ApplicationMasterService.java | 49 ++++-- .../resourcemanager/DefaultAMSProcessor.java | 67 ++++---- ...pportunisticContainerAllocatorAMService.java | 67 +++++--- .../yarn/server/resourcemanager/RMContext.java | 3 +- .../TestApplicationMasterService.java | 163 ++++++++++++++++++- 10 files changed, 445 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java new file mode 100644 index 0000000..988c727 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceContext.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.ams; + +/** + * This is a marker interface for a context object that is injected into + * the ApplicationMasterService processor. The processor implementation + * is free to type cast this based on the availability of the context's + * implementation in the classpath. + */ +public interface ApplicationMasterServiceContext { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java index b426f48..b7d925a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/ams/ApplicationMasterServiceProcessor.java @@ -38,34 +38,44 @@ import java.io.IOException; public interface ApplicationMasterServiceProcessor { /** + * Initialize with and ApplicationMasterService Context as well as the + * next processor in the chain. + * @param amsContext AMSContext. + * @param nextProcessor next ApplicationMasterServiceProcessor + */ + void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor); + + /** * Register AM attempt. * @param applicationAttemptId applicationAttemptId. * @param request Register Request. - * @return Register Response. + * @param response Register Response. * @throws IOException IOException. */ - RegisterApplicationMasterResponse registerApplicationMaster( + void registerApplicationMaster( ApplicationAttemptId applicationAttemptId, - RegisterApplicationMasterRequest request) throws IOException; + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) throws IOException; /** * Allocate call. * @param appAttemptId appAttemptId. * @param request Allocate Request. - * @return Allocate Response. + * @param response Allocate Response. * @throws YarnException YarnException. */ - AllocateResponse allocate(ApplicationAttemptId appAttemptId, - AllocateRequest request) throws YarnException; + void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException; /** * Finish AM. * @param applicationAttemptId applicationAttemptId. * @param request Finish AM Request. - * @return Finish AM response. + * @param response Finish AM Response. */ - FinishApplicationMasterResponse finishApplicationMaster( + void finishApplicationMaster( ApplicationAttemptId applicationAttemptId, - FinishApplicationMasterRequest request); - + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 63bcff2..9b47609 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -99,7 +99,7 @@ public class YarnConfiguration extends Configuration { YarnConfiguration.NM_PREFIX + "log-container-debug-info.enabled"; public static final boolean DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO = false; - + //////////////////////////////// // IPC Configs //////////////////////////////// @@ -145,6 +145,9 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_RM_ADDRESS = "0.0.0.0:" + DEFAULT_RM_PORT; + public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS = + RM_PREFIX + "application-master-service.processors"; + /** The actual bind address for the RM.*/ public static final String RM_BIND_HOST = RM_PREFIX + "bind-host"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5ca8d4e..826ded7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -123,6 +123,16 @@ </property> <property> + <description> + Comma separated class names of ApplicationMasterServiceProcessor + implementations. The processors will be applied in the order + they are specified. + </description> + <name>yarn.resourcemanager.application-master-service.processors</name> + <value></value> + </property> + + <property> <description> This configures the HTTP endpoint for Yarn Daemons.The following values are supported: http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java new file mode 100644 index 0000000..931b1c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AMSProcessingChain.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.io.IOException; + +/** + * This maintains a chain of {@link ApplicationMasterServiceProcessor}s. + */ +class AMSProcessingChain implements ApplicationMasterServiceProcessor { + + private static final Log LOG = LogFactory.getLog(AMSProcessingChain.class); + + private ApplicationMasterServiceProcessor head; + private RMContext rmContext; + + /** + * This has to be initialized with at-least 1 Processor. + * @param rootProcessor Root processor. + */ + AMSProcessingChain(ApplicationMasterServiceProcessor rootProcessor) { + if (rootProcessor == null) { + throw new YarnRuntimeException("No root ApplicationMasterService" + + "Processor specified for the processing chain.."); + } + this.head = rootProcessor; + } + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing AMS Processing chain. Root Processor=[" + + this.head.getClass().getName() + "]."); + this.rmContext = (RMContext)amsContext; + // The head is initialized with a null 'next' processor + this.head.init(amsContext, null); + } + + /** + * Add an processor to the top of the chain. + * @param processor ApplicationMasterServiceProcessor + */ + public synchronized void addProcessor( + ApplicationMasterServiceProcessor processor) { + LOG.info("Adding [" + processor.getClass().getName() + "] tp top of" + + " AMS Processing chain. "); + processor.init(this.rmContext, this.head); + this.head = processor; + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse resp) throws IOException { + this.head.registerApplicationMaster(applicationAttemptId, request, resp); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + this.head.allocate(appAttemptId, request, response); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.head.finishApplicationMaster(applicationAttemptId, request, response); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 5ed0a80..7c1e30b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -88,7 +89,7 @@ public class ApplicationMasterService extends AbstractService implements private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); protected final RMContext rmContext; - private final ApplicationMasterServiceProcessor amsProcessor; + private final AMSProcessingChain amsProcessingChain; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { @@ -101,11 +102,7 @@ public class ApplicationMasterService extends AbstractService implements this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; this.rmContext = rmContext; - this.amsProcessor = createProcessor(); - } - - protected ApplicationMasterServiceProcessor createProcessor() { - return new DefaultAMSProcessor(rmContext, rScheduler); + this.amsProcessingChain = new AMSProcessingChain(new DefaultAMSProcessor()); } @Override @@ -115,6 +112,21 @@ public class ApplicationMasterService extends AbstractService implements YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + amsProcessingChain.init(rmContext, null); + List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf); + if (processors != null) { + Collections.reverse(processors); + for (ApplicationMasterServiceProcessor p : processors) { + this.amsProcessingChain.addProcessor(p); + } + } + } + + protected List<ApplicationMasterServiceProcessor> getProcessorList( + Configuration conf) { + return conf.getInstances( + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + ApplicationMasterServiceProcessor.class); } @Override @@ -165,6 +177,10 @@ public class ApplicationMasterService extends AbstractService implements YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); } + protected AMSProcessingChain getProcessingChain() { + return this.amsProcessingChain; + } + @Private public InetSocketAddress getBindAddress() { return this.masterServiceAddress; @@ -214,8 +230,12 @@ public class ApplicationMasterService extends AbstractService implements lastResponse.setResponseId(0); lock.setAllocateResponse(lastResponse); - return this.amsProcessor.registerApplicationMaster( - amrmTokenIdentifier.getApplicationAttemptId(), request); + RegisterApplicationMasterResponse response = + recordFactory.newRecordInstance( + RegisterApplicationMasterResponse.class); + this.amsProcessingChain.registerApplicationMaster( + amrmTokenIdentifier.getApplicationAttemptId(), request, response); + return response; } } @@ -260,8 +280,11 @@ public class ApplicationMasterService extends AbstractService implements } this.amLivelinessMonitor.receivedPing(applicationAttemptId); - return this.amsProcessor.finishApplicationMaster( - applicationAttemptId, request); + FinishApplicationMasterResponse response = + FinishApplicationMasterResponse.newInstance(false); + this.amsProcessingChain.finishApplicationMaster( + applicationAttemptId, request, response); + return response; } } @@ -341,8 +364,10 @@ public class ApplicationMasterService extends AbstractService implements throw new InvalidApplicationMasterRequestException(message); } - AllocateResponse response = this.amsProcessor.allocate( - amrmTokenIdentifier.getApplicationAttemptId(), request); + AllocateResponse response = + recordFactory.newRecordInstance(AllocateResponse.class); + this.amsProcessingChain.allocate( + amrmTokenIdentifier.getApplicationAttemptId(), request, response); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index ecb66c3..6c7b6f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -81,7 +82,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { +/** + * This is the default Application Master Service processor. It has be the + * last processor in the @{@link AMSProcessingChain}. + */ +final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { private static final Log LOG = LogFactory.getLog(DefaultAMSProcessor.class); @@ -93,17 +98,19 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final RMContext rmContext; - private final YarnScheduler scheduler; + private RMContext rmContext; - DefaultAMSProcessor(RMContext rmContext, YarnScheduler scheduler) { - this.rmContext = rmContext; - this.scheduler = scheduler; + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + this.rmContext = (RMContext)amsContext; } - public RegisterApplicationMasterResponse registerApplicationMaster( + @Override + public void registerApplicationMaster( ApplicationAttemptId applicationAttemptId, - RegisterApplicationMasterRequest request) throws IOException { + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) throws IOException { RMApp app = getRmContext().getRMApps().get( applicationAttemptId.getApplicationId()); @@ -116,8 +123,6 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { RMAuditLogger.AuditConstants.REGISTER_AM, "ApplicationMasterService", app.getApplicationId(), applicationAttemptId); - RegisterApplicationMasterResponse response = recordFactory - .newRecordInstance(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(getScheduler() .getMaximumResourceCapability(app.getQueue())); response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId) @@ -165,11 +170,11 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { response.setSchedulerResourceTypes(getScheduler() .getSchedulingResourceTypes()); - return response; } - public AllocateResponse allocate(ApplicationAttemptId appAttemptId, - AllocateRequest request) throws YarnException { + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { handleProgress(appAttemptId, request); @@ -259,42 +264,38 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { "blacklistRemovals: " + blacklistRemovals); } RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = - recordFactory.newRecordInstance(AllocateResponse.class); if (allocation.getNMTokens() != null && !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); + response.setNMTokens(allocation.getNMTokens()); } // Notify the AM of container update errors ApplicationMasterServiceUtils.addToUpdateContainerErrors( - allocateResponse, updateErrors); + response, updateErrors); // update the response with the deltas of node status changes - handleNodeUpdates(app, allocateResponse); + handleNodeUpdates(app, response); ApplicationMasterServiceUtils.addToAllocatedContainers( - allocateResponse, allocation.getContainers()); + response, allocation.getContainers()); - allocateResponse.setCompletedContainersStatuses(appAttempt + response.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); + response.setAvailableResources(allocation.getResourceLimit()); - addToContainerUpdates(allocateResponse, allocation, + addToContainerUpdates(response, allocation, ((AbstractYarnScheduler)getScheduler()) .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors()); - allocateResponse.setNumClusterNodes(getScheduler().getNumClusterNodes()); + response.setNumClusterNodes(getScheduler().getNumClusterNodes()); // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); + response.setPreemptionMessage(generatePreemptionMessage(allocation)); // Set application priority - allocateResponse.setApplicationPriority(app + response.setApplicationPriority(app .getApplicationPriority()); - return allocateResponse; } private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { @@ -343,20 +344,20 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { .getProgress())); } - public FinishApplicationMasterResponse finishApplicationMaster( + @Override + public void finishApplicationMaster( ApplicationAttemptId applicationAttemptId, - FinishApplicationMasterRequest request) { + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { RMApp app = getRmContext().getRMApps().get(applicationAttemptId.getApplicationId()); // For UnmanagedAMs, return true so they don't retry - FinishApplicationMasterResponse response = - FinishApplicationMasterResponse.newInstance( + response.setIsUnregistered( app.getApplicationSubmissionContext().getUnmanagedAM()); getRmContext().getDispatcher().getEventHandler().handle( new RMAppAttemptUnregistrationEvent(applicationAttemptId, request .getTrackingUrl(), request.getFinalApplicationStatus(), request .getDiagnostics())); - return response; } private PreemptionMessage generatePreemptionMessage(Allocation allocation){ @@ -416,7 +417,7 @@ class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { } protected YarnScheduler getScheduler() { - return scheduler; + return rmContext.getScheduler(); } private static void addToContainerUpdates(AllocateResponse allocateResponse, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 75eb143..dcf6276 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -23,9 +23,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords + .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; @@ -101,17 +105,29 @@ public class OpportunisticContainerAllocatorAMService private volatile List<RemoteNode> cachedNodes; private volatile long lastCacheUpdateTime; - class OpportunisticAMSProcessor extends DefaultAMSProcessor { + class OpportunisticAMSProcessor implements + ApplicationMasterServiceProcessor { - OpportunisticAMSProcessor(RMContext rmContext, YarnScheduler - scheduler) { - super(rmContext, scheduler); + private ApplicationMasterServiceContext context; + private ApplicationMasterServiceProcessor nextProcessor; + + private YarnScheduler getScheduler() { + return ((RMContext)context).getScheduler(); } @Override - public RegisterApplicationMasterResponse registerApplicationMaster( + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor next) { + this.context = amsContext; + // The AMSProcessingChain guarantees that 'next' is not null. + this.nextProcessor = next; + } + + @Override + public void registerApplicationMaster( ApplicationAttemptId applicationAttemptId, - RegisterApplicationMasterRequest request) throws IOException { + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) throws IOException { final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) getScheduler()).getApplicationAttempt(applicationAttemptId); if (appAttempt.getOpportunisticContainerContext() == null) { @@ -135,12 +151,14 @@ public class OpportunisticContainerAllocatorAMService tokenExpiryInterval); appAttempt.setOpportunisticContainerContext(opCtx); } - return super.registerApplicationMaster(applicationAttemptId, request); + nextProcessor.registerApplicationMaster( + applicationAttemptId, request, response); } @Override - public AllocateResponse allocate(ApplicationAttemptId appAttemptId, - AllocateRequest request) throws YarnException { + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) + throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. OpportunisticContainerAllocator.PartitionedResourceRequests partitionedAsks = @@ -165,17 +183,22 @@ public class OpportunisticContainerAllocatorAMService if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + ApplicationMasterServiceUtils.addToAllocatedContainers( + response, oppContainers); } // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); + nextProcessor.allocate(appAttemptId, request, response); + } - AllocateResponse response = super.allocate(appAttemptId, request); - if (!oppContainers.isEmpty()) { - ApplicationMasterServiceUtils.addToAllocatedContainers( - response, oppContainers); - } - return response; + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + nextProcessor.finishApplicationMaster(applicationAttemptId, + request, response); } } @@ -237,11 +260,6 @@ public class OpportunisticContainerAllocatorAMService } @Override - protected ApplicationMasterServiceProcessor createProcessor() { - return new OpportunisticAMSProcessor(rmContext, rmContext.getScheduler()); - } - - @Override public Server getServer(YarnRPC rpc, Configuration serverConf, InetSocketAddress addr, AMRMTokenSecretManager secretManager) { if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) { @@ -262,6 +280,15 @@ public class OpportunisticContainerAllocatorAMService } @Override + protected List<ApplicationMasterServiceProcessor> getProcessorList( + Configuration conf) { + List<ApplicationMasterServiceProcessor> retVal = + super.getProcessorList(conf); + retVal.add(new OpportunisticAMSProcessor()); + return retVal; + } + + @Override public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 12d8a3b..7dc7c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; @@ -52,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS /** * Context of the ResourceManager. */ -public interface RMContext { +public interface RMContext extends ApplicationMasterServiceContext { Dispatcher getDispatcher(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfd248a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 18c49bd..85a36e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -20,20 +20,29 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static java.lang.Thread.sleep; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords + .RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; @@ -44,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -61,7 +71,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; public class TestApplicationMasterService { @@ -71,13 +81,160 @@ public class TestApplicationMasterService { private final int GB = 1024; private static YarnConfiguration conf; - @BeforeClass - public static void setup() { + private static AtomicInteger beforeRegCount = new AtomicInteger(0); + private static AtomicInteger afterRegCount = new AtomicInteger(0); + private static AtomicInteger beforeAllocCount = new AtomicInteger(0); + private static AtomicInteger afterAllocCount = new AtomicInteger(0); + private static AtomicInteger beforeFinishCount = new AtomicInteger(0); + private static AtomicInteger afterFinishCount = new AtomicInteger(0); + private static AtomicInteger initCount = new AtomicInteger(0); + + static class TestInterceptor1 implements + ApplicationMasterServiceProcessor { + + private ApplicationMasterServiceProcessor nextProcessor; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor next) { + initCount.incrementAndGet(); + this.nextProcessor = next; + } + + @Override + public void registerApplicationMaster(ApplicationAttemptId + applicationAttemptId, RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) throws IOException { + nextProcessor.registerApplicationMaster( + applicationAttemptId, request, response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, + AllocateResponse response) throws YarnException { + beforeAllocCount.incrementAndGet(); + nextProcessor.allocate(appAttemptId, request, response); + afterAllocCount.incrementAndGet(); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + beforeFinishCount.incrementAndGet(); + afterFinishCount.incrementAndGet(); + } + } + + static class TestInterceptor2 implements + ApplicationMasterServiceProcessor { + + private ApplicationMasterServiceProcessor nextProcessor; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor next) { + initCount.incrementAndGet(); + this.nextProcessor = next; + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) throws IOException { + beforeRegCount.incrementAndGet(); + nextProcessor.registerApplicationMaster(applicationAttemptId, + request, response); + afterRegCount.incrementAndGet(); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) + throws YarnException { + beforeAllocCount.incrementAndGet(); + nextProcessor.allocate(appAttemptId, request, response); + afterAllocCount.incrementAndGet(); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + beforeFinishCount.incrementAndGet(); + nextProcessor.finishApplicationMaster( + applicationAttemptId, request, response); + afterFinishCount.incrementAndGet(); + } + } + + @Before + public void setup() { conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); } + @Test(timeout = 300000) + public void testApplicationMasterInterceptor() throws Exception { + conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + TestInterceptor1.class.getName() + "," + + TestInterceptor2.class.getName()); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + int allocCount = 0; + + am1.addRequests(new String[] {"127.0.0.1"}, GB, 1, 1); + AllocateResponse alloc1Response = am1.schedule(); // send the request + allocCount++; + + // kick the scheduler + nm1.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + alloc1Response = am1.schedule(); + allocCount++; + } + + // assert RMIdentifer is set properly in allocated containers + Container allocatedContainer = + alloc1Response.getAllocatedContainers().get(0); + ContainerTokenIdentifier tokenId = + BuilderUtils.newContainerTokenIdentifier(allocatedContainer + .getContainerToken()); + am1.unregisterAppAttempt(); + + Assert.assertEquals(1, beforeRegCount.get()); + Assert.assertEquals(1, afterRegCount.get()); + + // The allocate calls should be incremented twice + Assert.assertEquals(allocCount * 2, beforeAllocCount.get()); + Assert.assertEquals(allocCount * 2, afterAllocCount.get()); + + // Finish should only be called once, since the FirstInterceptor + // does not forward the call. + Assert.assertEquals(1, beforeFinishCount.get()); + Assert.assertEquals(1, afterFinishCount.get()); + rm.stop(); + } + @Test(timeout = 3000000) public void testRMIdentifierOnContainerAllocation() throws Exception { MockRM rm = new MockRM(conf); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
