MAPREDUCE-6926. Allow MR jobs to opt out of oversubscription. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8757aa90 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8757aa90 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8757aa90 Branch: refs/heads/YARN-1011 Commit: 8757aa90a7c43ff523adb847e240efc2f10ca920 Parents: 6f6cac1 Author: Miklos Szegedi <szege...@apache.org> Authored: Wed Jan 10 13:21:11 2018 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Fri Sep 28 14:06:15 2018 -0700 ---------------------------------------------------------------------- .../v2/app/rm/RMContainerRequestor.java | 48 ++--- .../v2/app/rm/TestRMContainerAllocator.java | 192 +++++++++++++++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 6 + .../src/main/resources/mapred-default.xml | 8 + 4 files changed, 231 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8757aa90/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index bb3e1fa..d996690 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -111,6 +111,7 @@ public abstract class RMContainerRequestor extends RMCommunicator { .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final Set<String> blacklistRemovals = Collections .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + private boolean optOutOfOversubscription; public RMContainerRequestor(ClientService clientService, AppContext context) { super(clientService, context); @@ -136,9 +137,11 @@ public abstract class RMContainerRequestor extends RMCommunicator { public ContainerRequest(ContainerRequestEvent event, Priority priority, String nodeLabelExpression) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, nodeLabelExpression); + event.getRacks(), priority, System.currentTimeMillis(), + nodeLabelExpression); } + @VisibleForTesting public ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), @@ -146,13 +149,6 @@ public abstract class RMContainerRequestor extends RMCommunicator { } public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority, String nodeLabelExpression) { - this(attemptID, capability, hosts, racks, priority, - System.currentTimeMillis(), nodeLabelExpression); - } - - public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, Priority priority, long requestTimeMs,String nodeLabelExpression) { this.attemptID = attemptID; @@ -186,6 +182,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT); LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); + optOutOfOversubscription = conf.getBoolean( + MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT, + MRJobConfig.DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT); + LOG.info("optOutOfOversubscription is " + optOutOfOversubscription); if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) { throw new YarnRuntimeException("Invalid blacklistDisablePercent: " + blacklistDisablePercent @@ -398,20 +398,20 @@ public abstract class RMContainerRequestor extends RMCommunicator { for (String host : req.hosts) { // Data-local if (!isNodeBlacklisted(host)) { - addResourceRequest(req.priority, host, req.capability, + addGuaranteedResourceRequest(req.priority, host, req.capability, null); } } // Nothing Rack-local for now for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability, + addGuaranteedResourceRequest(req.priority, rack, req.capability, null); } // Off-switch - addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.nodeLabelExpression); + addGuaranteedResourceRequest(req.priority, ResourceRequest.ANY, + req.capability, req.nodeLabelExpression); } protected void decContainerReq(ContainerRequest req) { @@ -430,18 +430,18 @@ public abstract class RMContainerRequestor extends RMCommunicator { protected void addOpportunisticResourceRequest(Priority priority, Resource capability) { addResourceRequest(priority, ResourceRequest.ANY, capability, null, - ExecutionType.OPPORTUNISTIC); + ExecutionType.OPPORTUNISTIC, true); } - private void addResourceRequest(Priority priority, String resourceName, - Resource capability, String nodeLabelExpression) { + private void addGuaranteedResourceRequest(Priority priority, + String resourceName, Resource capability, String nodeLabelExpression) { addResourceRequest(priority, resourceName, capability, nodeLabelExpression, - ExecutionType.GUARANTEED); + ExecutionType.GUARANTEED, optOutOfOversubscription); } private void addResourceRequest(Priority priority, String resourceName, Resource capability, String nodeLabelExpression, - ExecutionType executionType) { + ExecutionType executionType, boolean enforceExecutionType) { Map<String, Map<Resource, ResourceRequest>> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -464,8 +464,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); - remoteRequest.setExecutionTypeRequest( - ExecutionTypeRequest.newInstance(executionType, true)); + remoteRequest.setExecutionTypeRequest(ExecutionTypeRequest. + newInstance(executionType, enforceExecutionType)); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); @@ -473,9 +473,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { // Note this down for next interaction with ResourceManager addResourceRequestToAsk(remoteRequest); if (LOG.isDebugEnabled()) { - LOG.debug("addResourceRequest:" + " applicationId=" + LOG.debug("addGuaranteedResourceRequest:" + " applicationId=" + applicationId.getId() + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" + + " resourceName=" + resourceName + " ExecutionType=" + executionType + + " enforceExecutionType=" + enforceExecutionType + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" + ask.size()); } } @@ -559,8 +560,9 @@ public abstract class RMContainerRequestor extends RMCommunicator { } } String[] hosts = newHosts.toArray(new String[newHosts.size()]); - ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, - hosts, orig.racks, orig.priority, orig.nodeLabelExpression); + ContainerRequest newReq = new ContainerRequest(orig.attemptID, + orig.capability, hosts, orig.racks, orig.priority, + System.currentTimeMillis(), orig.nodeLabelExpression); return newReq; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8757aa90/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 427e6ea..5d4dcf0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -868,6 +870,196 @@ public class TestRMContainerAllocator { allocator.close(); } + /** + * Test A MapReduce job can be configured to opt out of oversubscription, + * that is, it always wait for guaranteed resources to execute its tasks. + * This is done by setting up a MapReduce job with 2 mappers and 1 reducers + * and capturing all ResourceRequests sent from the AM to RM, then checking + * if all ResourceRequests are guaranteed and their enforceExecutionType is + * true. + */ + @Test + public void testMapReduceOptingOutOversubscription() throws Exception { + List<ResourceRequest> resourceRequests = captureResourceRequests(true); + + for(ResourceRequest resourceRequest : resourceRequests) { + ExecutionTypeRequest executionTypeRequest = + resourceRequest.getExecutionTypeRequest(); + if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))) { + Assert.fail("The execution type of ResourceRequest " + resourceRequest + + " is not guaranteed or not enforced."); + } + } + } + + /** + * Test a MapReduce job can be configured to opt in oversubscription ( + * true by default). This is done by setting up a MapReduce job with 2 + * mappers and 1 reducers and capturing all ResourceRequests sent from + * the AM to RM, then checking if all ResourceRequests are guaranteed + * but their enforceExecutionType is always set to false. + */ + @Test + public void testMapReduceOptingInOversubscription() throws Exception { + List<ResourceRequest> resourceRequests = captureResourceRequests(false); + + for(ResourceRequest resourceRequest : resourceRequests) { + ExecutionTypeRequest executionTypeRequest = + resourceRequest.getExecutionTypeRequest(); + if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, false))) { + Assert.fail("The execution type of ResourceRequest " + resourceRequest + + " is not guaranteed or it is enforced."); + } + } + } + + /** + * Set up a mapreduce job with 2 mappers and 1 reducer and return + * all ResourceRequests sent from the AM to RM. + */ + private List<ResourceRequest> captureResourceRequests( + boolean optOutOfOversubscription) throws Exception { + List<ResourceRequest> resourceRequests = new ArrayList<>(); + + final Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT, + optOutOfOversubscription); + + // start the resource manager + final MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // submit an application + RMApp rmApp = rm.submitApp(1024); + rm.drainEvents(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 11264); + amNodeManager.nodeHeartbeat(true); + rm.drainEvents(); + + final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + rm.drainEvents(); + + // start the MR AM and wait until it is in running state + MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( + appAttemptId, 0), 2, 1, false, + this.getClass().getName(), true, 1) { + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new MyContainerAllocator(rm, appAttemptId, context); + }; + }; + mrApp.submit(conf); + + Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() + .getValue(); + DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); + MyContainerAllocator allocator = (MyContainerAllocator) mrApp + .getContainerAllocator(); + mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING); + amDispatcher.await(); + + // wait until all attempts request for containers + for (Task t : job.getTasks().values()) { + mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values() + .iterator().next(), TaskAttemptStateInternal.UNASSIGNED); + } + amDispatcher.await(); + + // send map resource requests to RM + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + // wait for both map tasks to be running + amNodeManager.nodeHeartbeat(true); + rm.drainEvents(); + + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.MAP) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } + + // finish both map tasks so that the reduce task can be scheduled + Iterator<Task> it = job.getTasks().values().iterator(); + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + // send the reduce resource requests to RM + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + // wait for the reduce task to be running + amNodeManager.nodeHeartbeat(true); + rm.drainEvents(); + + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.REDUCE) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } + + // finish the reduce task + finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); + allocator.schedule(); + rm.drainEvents(); + for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) { + resourceRequests.add(ResourceRequest.newInstance( + rr.getPriority(), rr.getResourceName(), rr.getCapability(), + rr.getNumContainers(), rr.getRelaxLocality(), + rr.getNodeLabelExpression(), rr.getExecutionTypeRequest())); + } + + return resourceRequests; + } + @Test public void testMapReduceScheduling() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8757aa90/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index ca18bfe..48887c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -1179,6 +1179,12 @@ public interface MRJobConfig { public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0; /** + * Opt out of YARN oversubscription so that the job always waits for + * GUARANTEED resources available in the cluster. + */ + String MR_OVERSUBSCRIPTION_OPT_OUT = "mapreduce.job.oversubscription-opt-out"; + boolean DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT = false; + /** * A comma-separated list of properties whose value will be redacted. */ String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8757aa90/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9f33d65..de725a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -2151,4 +2151,12 @@ </description> </property> +<property> + <description> + Opts out of YARN oversubscription so that the job always waits for GUARANTEED + resources available. + </description> + <name>mapreduce.job.oversubscription-opt-out</name> + <value>false</value> +</property> </configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org