Repository: hadoop Updated Branches: refs/heads/MR-6749 83b9cdf0b -> 06dcc886a
MAPREDUCE-6809. Create ContainerRequestor interface and refactor RMContainerRequestor to use it. Contributed by Devaraj K. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/06dcc886 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/06dcc886 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/06dcc886 Branch: refs/heads/MR-6749 Commit: 06dcc886a96e33cac4ddb5bea2aeab741aa6fea3 Parents: 83b9cdf Author: Naganarasimha <[email protected]> Authored: Sat Nov 12 12:28:42 2016 +0530 Committer: Naganarasimha <[email protected]> Committed: Sat Nov 12 12:28:42 2016 +0530 ---------------------------------------------------------------------- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 3 +- .../mapreduce/v2/app/rm/ContainerRequestor.java | 50 ++++++++++ .../v2/app/rm/RMContainerAllocator.java | 96 +++++++++++++------- .../v2/app/rm/RMContainerRequestor.java | 54 +++++++---- .../v2/app/rm/TestRMContainerAllocator.java | 41 ++++----- 5 files changed, 165 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 4a8a90e..85e4b50 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -110,7 +110,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; -import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; @@ -1141,7 +1140,7 @@ public class MRAppMaster extends CompositeService { @Override public Set<String> getBlacklistedNodes() { - return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes(); + return ((RMContainerAllocator) containerAllocator).getBlacklistedNodes(); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java new file mode 100644 index 0000000..2d54633 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app.rm; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +/** + * Interface for ContainerReqestor. + */ +public interface ContainerRequestor { + + AllocateResponse makeRemoteRequest() + throws YarnRuntimeException, YarnException, IOException; + + void addContainerReq(ContainerRequest request); + + void decContainerReq(ContainerRequest request); + + void release(ContainerId containerId); + + boolean isNodeBlacklisted(String hostname); + + Resource getAvailableResources(); + + void containerFailedOnHost(String hostName); + + ContainerRequest filterRequest(ContainerRequest orig); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 4cb3cbe..9f34ea4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringInterner; @@ -94,7 +95,7 @@ import com.google.common.annotations.VisibleForTesting; /** * Allocates the container from the ResourceManager scheduler. */ -public class RMContainerAllocator extends RMContainerRequestor +public class RMContainerAllocator extends RMCommunicator implements ContainerAllocator { static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); @@ -114,6 +115,9 @@ public class RMContainerAllocator extends RMContainerRequestor private Thread eventHandlingThread; private final AtomicBoolean stopped; + @VisibleForTesting + protected RMContainerRequestor containerRequestor; + static { PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_FAST_FAIL_MAP.setPriority(5); @@ -207,6 +211,7 @@ public class RMContainerAllocator extends RMContainerRequestor return new AssignedRequests(); } + @SuppressWarnings("unchecked") @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); @@ -240,6 +245,8 @@ public class RMContainerAllocator extends RMContainerRequestor this.scheduledRequests.setNumOpportunisticMapsPer100( conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100)); + containerRequestor = new RMContainerRequestor(this); + containerRequestor.init(conf); } @Override @@ -450,8 +457,8 @@ public class RMContainerAllocator extends RMContainerRequestor removed = true; assignedRequests.remove(aId); containersReleased++; - pendingRelease.add(containerId); - release(containerId); + containerRequestor.pendingRelease.add(containerId); + containerRequestor.release(containerId); } } if (!removed) { @@ -463,7 +470,7 @@ public class RMContainerAllocator extends RMContainerRequestor event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { ContainerFailedEvent fEv = (ContainerFailedEvent) event; String host = getHost(fEv.getContMgrAddress()); - containerFailedOnHost(host); + containerRequestor.containerFailedOnHost(host); // propagate failures to preemption policy to discard checkpoints for // failed tasks preemptionPolicy.handleFailedContainer(event.getAttemptID()); @@ -522,9 +529,11 @@ public class RMContainerAllocator extends RMContainerRequestor // The pending mappers haven't been waiting for too long. Let us see if // the headroom can fit a mapper. - Resource availableResourceForMap = getAvailableResources(); - if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, - mapResourceRequest, getSchedulerResourceTypes()) > 0) { + Resource availableResourceForMap = containerRequestor + .getAvailableResources(); + if (ResourceCalculatorUtils.computeAvailableContainers( + availableResourceForMap, mapResourceRequest, + getSchedulerResourceTypes()) > 0) { // the available headroom is enough to run a mapper return false; } @@ -601,7 +610,7 @@ public class RMContainerAllocator extends RMContainerRequestor } // get available resources for this job - Resource headRoom = getAvailableResources(); + Resource headRoom = containerRequestor.getAvailableResources(); LOG.info("Recalculating schedule, headroom=" + headRoom); @@ -732,7 +741,8 @@ public class RMContainerAllocator extends RMContainerRequestor applyConcurrentTaskLimits(); // will be null the first time - Resource headRoom = Resources.clone(getAvailableResources()); + Resource headRoom = Resources + .clone(containerRequestor.getAvailableResources()); AllocateResponse response; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS @@ -740,7 +750,7 @@ public class RMContainerAllocator extends RMContainerRequestor * to contact the RM. */ try { - response = makeRemoteRequest(); + response = containerRequestor.makeRemoteRequest(); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); } catch (ApplicationAttemptNotFoundException e ) { @@ -755,9 +765,9 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.info("ApplicationMaster is out of sync with ResourceManager," + " hence resync and send outstanding requests."); // RM may have restarted, re-register with RM. - lastResponseID = 0; + containerRequestor.lastResponseID = 0; register(); - addOutstandingRequestOnResync(); + containerRequestor.addOutstandingRequestOnResync(); return null; } catch (InvalidLabelResourceRequestException e) { // If Invalid label exception is received means the requested label doesnt @@ -783,7 +793,7 @@ public class RMContainerAllocator extends RMContainerRequestor // continue to attempt to contact the RM. throw e; } - Resource newHeadRoom = getAvailableResources(); + Resource newHeadRoom = containerRequestor.getAvailableResources(); List<Container> newContainers = response.getAllocatedContainers(); // Setting NMTokens if (response.getNMTokens() != null) { @@ -823,7 +833,7 @@ public class RMContainerAllocator extends RMContainerRequestor } //Called on each allocation. Will know about newly blacklisted/added hosts. - computeIgnoreBlacklisting(); + containerRequestor.computeIgnoreBlacklisting(); handleUpdatedNodes(response); handleJobPriorityChange(response); @@ -852,7 +862,7 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.error("Container complete event for unknown container " + container.getContainerId()); } else { - pendingRelease.remove(container.getContainerId()); + containerRequestor.pendingRelease.remove(container.getContainerId()); assignedRequests.remove(attemptID); // Send the diagnostics @@ -878,11 +888,12 @@ public class RMContainerAllocator extends RMContainerRequestor int normalMapRequestLimit = Math.min( maxRequestedMaps - failedMapRequestLimit, numScheduledMaps - numScheduledFailMaps); - setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest, - failedMapRequestLimit); - setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit); - setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest, + containerRequestor.setRequestLimit(PRIORITY_FAST_FAIL_MAP, + mapResourceRequest, failedMapRequestLimit); + containerRequestor.setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit); + containerRequestor.setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, + mapResourceRequest, normalMapRequestLimit); } int numScheduledReduces = scheduledRequests.reduces.size(); @@ -891,7 +902,7 @@ public class RMContainerAllocator extends RMContainerRequestor maxRunningReduces - assignedRequests.reduces.size()); int reduceRequestLimit = Math.min(maxRequestedReduces, numScheduledReduces); - setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest, + containerRequestor.setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest, reduceRequestLimit); } } @@ -980,7 +991,7 @@ public class RMContainerAllocator extends RMContainerRequestor @Private public Resource getResourceLimit() { - Resource headRoom = getAvailableResources(); + Resource headRoom = containerRequestor.getAvailableResources(); Resource assignedMapResource = Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); Resource assignedReduceResource = @@ -1032,7 +1043,7 @@ public class RMContainerAllocator extends RMContainerRequestor if (req == null) { return false; } else { - decContainerReq(req); + containerRequestor.decContainerReq(req); return true; } } @@ -1042,7 +1053,7 @@ public class RMContainerAllocator extends RMContainerRequestor if (it.hasNext()) { Entry<TaskAttemptId, ContainerRequest> entry = it.next(); it.remove(); - decContainerReq(entry.getValue()); + containerRequestor.decContainerReq(entry.getValue()); return entry.getValue(); } return null; @@ -1059,14 +1070,15 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); // If its an earlier Failed attempt, do not retry as OPPORTUNISTIC maps.put(event.getAttemptID(), request); - addContainerReq(request); + containerRequestor.addContainerReq(request); } else { if (mapsMod100 < numOpportunisticMapsPer100) { request = new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP, mapNodeLabelExpression); maps.put(event.getAttemptID(), request); - addOpportunisticResourceRequest(request.priority, request.capability); + containerRequestor.addOpportunisticResourceRequest(request.priority, + request.capability); } else { request = new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); @@ -1093,7 +1105,7 @@ public class RMContainerAllocator extends RMContainerRequestor } } maps.put(event.getAttemptID(), request); - addContainerReq(request); + containerRequestor.addContainerReq(request); } mapsMod100++; mapsMod100 %= 100; @@ -1103,7 +1115,7 @@ public class RMContainerAllocator extends RMContainerRequestor void addReduce(ContainerRequest req) { reduces.put(req.attemptID, req); - addContainerReq(req); + containerRequestor.addContainerReq(req); } // this method will change the list of allocatedContainers. @@ -1168,7 +1180,7 @@ public class RMContainerAllocator extends RMContainerRequestor // do not assign if allocated container is on a // blacklisted host String allocatedHost = allocated.getNodeId().getHost(); - if (isNodeBlacklisted(allocatedHost)) { + if (containerRequestor.isNodeBlacklisted(allocatedHost)) { // we need to request for a new container // and release the current one LOG.info("Got allocated container on a blacklisted " @@ -1182,9 +1194,9 @@ public class RMContainerAllocator extends RMContainerRequestor if (toBeReplacedReq != null) { LOG.info("Placing a new container request for task attempt " + toBeReplacedReq.attemptID); - ContainerRequest newReq = - getFilteredContainerRequest(toBeReplacedReq); - decContainerReq(toBeReplacedReq); + ContainerRequest newReq = containerRequestor + .filterRequest(toBeReplacedReq); + containerRequestor.decContainerReq(toBeReplacedReq); if (toBeReplacedReq.attemptID.getTaskId().getTaskType() == TaskType.MAP) { maps.put(newReq.attemptID, newReq); @@ -1192,7 +1204,7 @@ public class RMContainerAllocator extends RMContainerRequestor else { reduces.put(newReq.attemptID, newReq); } - addContainerReq(newReq); + containerRequestor.addContainerReq(newReq); } else { LOG.info("Could not map allocated container to a valid request." @@ -1221,7 +1233,7 @@ public class RMContainerAllocator extends RMContainerRequestor private void containerAssigned(Container allocated, ContainerRequest assigned) { // Update resource requests - decContainerReq(assigned); + containerRequestor.decContainerReq(assigned); // send the container-assigned event to task attempt eventHandler.handle(new TaskAttemptContainerAssignedEvent( @@ -1238,8 +1250,8 @@ public class RMContainerAllocator extends RMContainerRequestor private void containerNotAssigned(Container allocated) { containersReleased++; - pendingRelease.add(allocated.getId()); - release(allocated.getId()); + containerRequestor.pendingRelease.add(allocated.getId()); + containerRequestor.release(allocated.getId()); } private ContainerRequest assignWithoutLocality(Container allocated) { @@ -1604,4 +1616,18 @@ public class RMContainerAllocator extends RMContainerRequestor } + public Set<String> getBlacklistedNodes() { + return containerRequestor.getBlacklistedNodes(); + } + + public RMContainerRequestor getContainerRequestor() { + return containerRequestor; + } + + public void resetContainerForReuse(ContainerId containerId) { + TaskAttemptId attemptId = assignedRequests.get(containerId); + if (attemptId != null) { + assignedRequests.remove(attemptId); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index f4579ab..3475d75 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -36,10 +36,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; -import org.apache.hadoop.mapreduce.v2.app.AppContext; -import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -60,7 +60,8 @@ import org.apache.hadoop.yarn.util.resource.Resources; /** * Keeps the data structures to send container requests to RM. */ -public abstract class RMContainerRequestor extends RMCommunicator { +public class RMContainerRequestor extends AbstractService + implements ContainerRequestor { private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR = @@ -110,9 +111,13 @@ public abstract class RMContainerRequestor extends RMCommunicator { .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final Set<String> blacklistRemovals = Collections .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + private final ApplicationId applicationId; + private final RMCommunicator rmCommunicator; - public RMContainerRequestor(ClientService clientService, AppContext context) { - super(clientService, context); + public RMContainerRequestor(RMCommunicator rmCommunicator) { + super(RMContainerRequestor.class.getName()); + this.rmCommunicator = rmCommunicator; + applicationId = rmCommunicator.applicationId; } @Private @@ -193,17 +198,19 @@ public abstract class RMContainerRequestor extends RMCommunicator { LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } - protected AllocateResponse makeRemoteRequest() throws YarnException, - IOException { + @Override + public AllocateResponse makeRemoteRequest() + throws YarnException, IOException { applyRequestLimits(); ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), new ArrayList<String>(blacklistRemovals)); - AllocateRequest allocateRequest = - AllocateRequest.newInstance(lastResponseID, - super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), - new ArrayList<ContainerId>(release), blacklistRequest); - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); + AllocateRequest allocateRequest = AllocateRequest.newInstance( + lastResponseID, rmCommunicator.getApplicationProgress(), + new ArrayList<ResourceRequest>(ask), + new ArrayList<ContainerId>(release), blacklistRequest); + AllocateResponse allocateResponse = rmCommunicator.scheduler + .allocate(allocateRequest); lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -323,7 +330,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { } } - protected void containerFailedOnHost(String hostName) { + @Override + public void containerFailedOnHost(String hostName) { if (!nodeBlacklistingEnabled) { return; } @@ -388,11 +396,13 @@ public abstract class RMContainerRequestor extends RMCommunicator { } } - protected Resource getAvailableResources() { + @Override + public Resource getAvailableResources() { return availableResources == null ? Resources.none() : availableResources; } - protected void addContainerReq(ContainerRequest req) { + @Override + public void addContainerReq(ContainerRequest req) { // Create resource requests for (String host : req.hosts) { // Data-local @@ -413,7 +423,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { req.nodeLabelExpression); } - protected void decContainerReq(ContainerRequest req) { + @Override + public void decContainerReq(ContainerRequest req) { // Update resource requests for (String hostName : req.hosts) { decResourceRequest(req.priority, hostName, req.capability); @@ -539,18 +550,21 @@ public abstract class RMContainerRequestor extends RMCommunicator { ask.add(remoteRequest); } - protected void release(ContainerId containerId) { + @Override + public void release(ContainerId containerId) { release.add(containerId); } - protected boolean isNodeBlacklisted(String hostname) { + @Override + public boolean isNodeBlacklisted(String hostname) { if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) { return false; } return blacklistedNodes.contains(hostname); } - - protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) { + + @Override + public ContainerRequest filterRequest(ContainerRequest orig) { ArrayList<String> newHosts = new ArrayList<String>(); for (String host : orig.hosts) { if (!isNodeBlacklisted(host)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/06dcc886/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 38a9731..1570c9b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -664,7 +665,7 @@ public class TestRMContainerAllocator { final String[] locations = new String[] { host }; allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); allocator.scheduleAllReduces(); - allocator.makeRemoteRequest(); + allocator.containerRequestor.makeRemoteRequest(); nm.nodeHeartbeat(true); dispatcher.await(); allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); @@ -2059,24 +2060,18 @@ public class TestRMContainerAllocator { public void updateSchedulerProxy(MyResourceManager rm) { scheduler = rm.getApplicationMasterService(); } - - @Override - protected AllocateResponse makeRemoteRequest() throws IOException, - YarnException { - allocateResponse = super.makeRemoteRequest(); - return allocateResponse; - } } private static class MyContainerAllocator2 extends MyContainerAllocator { public MyContainerAllocator2(MyResourceManager rm, Configuration conf, - ApplicationAttemptId appAttemptId, Job job) { + ApplicationAttemptId appAttemptId, Job job) + throws YarnException, IOException { super(rm, conf, appAttemptId, job); - } - @Override - protected AllocateResponse makeRemoteRequest() throws IOException, - YarnException { - throw new IOException("for testing"); + containerRequestor = mock(RMContainerRequestor.class); + doThrow(new IOException("for testing")).when(containerRequestor) + .makeRemoteRequest(); + doReturn(Resource.newInstance(2048, 1)).when(containerRequestor) + .getAvailableResources(); } } @@ -2100,6 +2095,7 @@ public class TestRMContainerAllocator { any(Resource.class), anyInt(), anyFloat(), anyFloat()); doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator) .getSchedulerResourceTypes(); + allocator.containerRequestor = mock(RMContainerRequestor.class); // Test slow-start allocator.scheduleReduces( @@ -2983,8 +2979,8 @@ public class TestRMContainerAllocator { Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); - Assert.assertEquals(6, allocator.getAsk().size()); - for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals(6, allocator.containerRequestor.getAsk().size()); + for (ResourceRequest req : allocator.containerRequestor.getAsk()) { boolean isReduce = req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); if (isReduce) { @@ -3009,8 +3005,8 @@ public class TestRMContainerAllocator { // indicate ramping down of reduces to scheduler. Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); Assert.assertEquals(2, allocator.getNumOfPendingReduces()); - Assert.assertEquals(3, allocator.getAsk().size()); - for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals(3, allocator.containerRequestor.getAsk().size()); + for (ResourceRequest req : allocator.containerRequestor.getAsk()) { Assert.assertEquals( RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); Assert.assertTrue(req.getResourceName().equals("*") || @@ -3037,6 +3033,7 @@ public class TestRMContainerAllocator { RMContainerAllocator containerAllocator = new RMContainerAllocatorForFinishedContainer(null, context, mock(AMPreemptionPolicy.class)); + containerAllocator.init(new Configuration()); ContainerStatus finishedContainer = ContainerStatus.newInstance( mock(ContainerId.class), ContainerState.COMPLETE, "", 0); @@ -3149,8 +3146,8 @@ public class TestRMContainerAllocator { Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); - Assert.assertEquals(6, allocator.getAsk().size()); - for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals(6, allocator.containerRequestor.getAsk().size()); + for (ResourceRequest req : allocator.containerRequestor.getAsk()) { boolean isReduce = req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE); if (isReduce) { @@ -3178,8 +3175,8 @@ public class TestRMContainerAllocator { // indicate ramping down of reduces to scheduler. Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size()); Assert.assertEquals(2, allocator.getNumOfPendingReduces()); - Assert.assertEquals(3, allocator.getAsk().size()); - for (ResourceRequest req : allocator.getAsk()) { + Assert.assertEquals(3, allocator.containerRequestor.getAsk().size()); + for (ResourceRequest req : allocator.containerRequestor.getAsk()) { Assert.assertEquals( RMContainerAllocator.PRIORITY_REDUCE, req.getPriority()); Assert.assertTrue(req.getResourceName().equals("*") || --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
