Repository: hadoop
Updated Branches:
refs/heads/branch-2 6ed8989a6 -> 7cd0874df
MAPREDUCE-6304. Specifying node labels when submitting MR jobs. (Naganarasimha
G R via wangda)
(cherry picked from commit 3164e7d83875aa6b7435d1dfe61ac280aa277f1c)
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7cd0874d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7cd0874d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7cd0874d
Branch: refs/heads/branch-2
Commit: 7cd0874dfd4206e323ea9df447a77b08f5698d76
Parents: 6ed8989
Author: Wangda Tan <[email protected]>
Authored: Wed May 27 14:26:03 2015 -0700
Committer: Wangda Tan <[email protected]>
Committed: Wed May 27 14:43:20 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/rm/RMContainerAllocator.java | 19 ++++-
.../v2/app/rm/RMContainerRequestor.java | 32 ++++---
.../v2/app/rm/TestRMContainerAllocator.java | 89 +++++++++++++++++++-
.../apache/hadoop/mapreduce/MRJobConfig.java | 20 +++++
.../src/main/resources/mapred-default.xml | 35 ++++++++
.../org/apache/hadoop/mapred/YARNRunner.java | 30 ++++++-
.../apache/hadoop/mapred/TestYARNRunner.java | 16 ++++
8 files changed, 226 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt
b/hadoop-mapreduce-project/CHANGES.txt
index 65ed58b..c0b7a61 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -12,6 +12,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6364. Add a "Kill" link to Task Attempts page. (Ryu Kobayashi
via ozawa)
+ MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
+ (Naganarasimha G R via wangda)
+
IMPROVEMENTS
MAPREDUCE-6291. Correct mapred queue usage command.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 99d26a2..dc5198b 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -176,6 +176,10 @@ public class RMContainerAllocator extends
RMContainerRequestor
private ScheduleStats scheduleStats = new ScheduleStats();
+ private String mapNodeLabelExpression;
+
+ private String reduceNodeLabelExpression;
+
public RMContainerAllocator(ClientService clientService, AppContext context)
{
super(clientService, context);
this.stopped = new AtomicBoolean(false);
@@ -204,6 +208,8 @@ public class RMContainerAllocator extends
RMContainerRequestor
RackResolver.init(conf);
retryInterval =
getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
+ reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
@@ -390,9 +396,11 @@ public class RMContainerAllocator extends
RMContainerRequestor
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
- pendingReduces.addFirst(new ContainerRequest(reqEvent,
PRIORITY_REDUCE));
+ pendingReduces.addFirst(new ContainerRequest(reqEvent,
+ PRIORITY_REDUCE, reduceNodeLabelExpression));
} else {
- pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+ pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
+ reduceNodeLabelExpression));
//reduces are added to pending and are slowly ramped up
}
}
@@ -931,7 +939,9 @@ public class RMContainerAllocator extends
RMContainerRequestor
if (event.getEarlierAttemptFailed()) {
earlierFailedMaps.add(event.getAttemptID());
- request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+ request =
+ new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
+ mapNodeLabelExpression);
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
} else {
for (String host : event.getHosts()) {
@@ -956,7 +966,8 @@ public class RMContainerAllocator extends
RMContainerRequestor
LOG.debug("Added attempt req to rack " + rack);
}
}
- request = new ContainerRequest(event, PRIORITY_MAP);
+ request =
+ new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
}
maps.put(event.getAttemptID(), request);
addContainerReq(request);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index 1666864..155711f 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -121,39 +121,43 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
+ final String nodeLabelExpression;
+
/**
* the time when this request object was formed; can be used to avoid
* aggressive preemption for recently placed requests
*/
final long requestTimeMs;
- public ContainerRequest(ContainerRequestEvent event, Priority priority) {
+ public ContainerRequest(ContainerRequestEvent event, Priority priority,
+ String nodeLabelExpression) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
- event.getRacks(), priority);
+ event.getRacks(), priority, nodeLabelExpression);
}
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
- event.getRacks(), priority, requestTimeMs);
+ event.getRacks(), priority, requestTimeMs,null);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[]
racks,
- Priority priority) {
+ Priority priority, String nodeLabelExpression) {
this(attemptID, capability, hosts, racks, priority,
- System.currentTimeMillis());
+ System.currentTimeMillis(), nodeLabelExpression);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
- Priority priority, long requestTimeMs) {
+ Priority priority, long requestTimeMs,String nodeLabelExpression) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
this.requestTimeMs = requestTimeMs;
+ this.nodeLabelExpression = nodeLabelExpression;
}
public String toString() {
@@ -390,17 +394,20 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
- addResourceRequest(req.priority, host, req.capability);
- }
+ addResourceRequest(req.priority, host, req.capability,
+ null);
+ }
}
// Nothing Rack-local for now
for (String rack : req.racks) {
- addResourceRequest(req.priority, rack, req.capability);
+ addResourceRequest(req.priority, rack, req.capability,
+ null);
}
// Off-switch
- addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
+ addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+ req.nodeLabelExpression);
}
protected void decContainerReq(ContainerRequest req) {
@@ -417,7 +424,7 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
}
private void addResourceRequest(Priority priority, String resourceName,
- Resource capability) {
+ Resource capability, String nodeLabelExpression) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@@ -439,6 +446,7 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
remoteRequest.setResourceName(resourceName);
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
+ remoteRequest.setNodeLabelExpression(nodeLabelExpression);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -533,7 +541,7 @@ public abstract class RMContainerRequestor extends
RMCommunicator {
}
String[] hosts = newHosts.toArray(new String[newHosts.size()]);
ContainerRequest newReq = new ContainerRequest(orig.attemptID,
orig.capability,
- hosts, orig.racks, orig.priority);
+ hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
return newReq;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index c58cdbd..4644a86 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -490,7 +491,7 @@ public class TestRMContainerAllocator {
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
- new RMContainerRequestor.ContainerRequest(event1, null));
+ new RMContainerRequestor.ContainerRequest(event1, null,null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
@@ -561,6 +562,91 @@ public class TestRMContainerAllocator {
}
@Test
+ public void testMapReduceAllocationWithNodeLabelExpression() throws
Exception {
+
+ LOG.info("Running testMapReduceAllocationWithNodeLabelExpression");
+ Configuration conf = new Configuration();
+ /*
+ * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1;
+ * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+ * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+ */
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+ conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes");
+ conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
+ @Override
+ protected void register() {
+ }
+
+ @Override
+ protected ApplicationMasterProtocol createSchedulerProxy() {
+ return mockScheduler;
+ }
+ };
+
+ // create some map requests
+ ContainerRequestEvent reqMapEvents;
+ reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+ allocator.sendRequests(Arrays.asList(reqMapEvents));
+
+ // create some reduce requests
+ ContainerRequestEvent reqReduceEvents;
+ reqReduceEvents =
+ createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+ allocator.sendRequests(Arrays.asList(reqReduceEvents));
+ allocator.schedule();
+ // verify all of the host-specific asks were sent plus one for the
+ // default rack and one for the ANY request
+ Assert.assertEquals(3, mockScheduler.lastAsk.size());
+ // verify ResourceRequest sent for MAP have appropriate node
+ // label expression as per the configuration
+ validateLabelsRequests(mockScheduler.lastAsk.get(0), false);
+ validateLabelsRequests(mockScheduler.lastAsk.get(1), false);
+ validateLabelsRequests(mockScheduler.lastAsk.get(2), false);
+
+ // assign a map task and verify we do not ask for any more maps
+ ContainerId cid0 = mockScheduler.assignContainer("map", false);
+ allocator.schedule();
+ // default rack and one for the ANY request
+ Assert.assertEquals(3, mockScheduler.lastAsk.size());
+ validateLabelsRequests(mockScheduler.lastAsk.get(0), true);
+ validateLabelsRequests(mockScheduler.lastAsk.get(1), true);
+ validateLabelsRequests(mockScheduler.lastAsk.get(2), true);
+
+ // complete the map task and verify that we ask for one more
+ allocator.close();
+ }
+
+ private void validateLabelsRequests(ResourceRequest resourceRequest,
+ boolean isReduce) {
+ switch (resourceRequest.getResourceName()) {
+ case "map":
+ case "reduce":
+ case NetworkTopology.DEFAULT_RACK:
+ Assert.assertNull(resourceRequest.getNodeLabelExpression());
+ break;
+ case "*":
+ Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes",
+ resourceRequest.getNodeLabelExpression());
+ break;
+ default:
+ Assert.fail("Invalid resource location "
+ + resourceRequest.getResourceName());
+ }
+ }
+
+ @Test
public void testMapReduceScheduling() throws Exception {
LOG.info("Running testMapReduceScheduling");
@@ -1497,6 +1583,7 @@ public class TestRMContainerAllocator {
.getNumContainers(), req.getRelaxLocality());
askCopy.add(reqCopy);
}
+ SecurityUtil.setTokenServiceUseIp(false);
lastAsk = ask;
lastRelease = release;
lastBlacklistAdditions = blacklistAdditions;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 336907d..61d1ce7 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -68,6 +68,26 @@ public interface MRJobConfig {
public static final String QUEUE_NAME = "mapreduce.job.queuename";
+ /**
+ * Node Label expression applicable for all Job containers.
+ */
+ public static final String JOB_NODE_LABEL_EXP =
"mapreduce.job.node-label-expression";
+
+ /**
+ * Node Label expression applicable for AM containers.
+ */
+ public static final String AM_NODE_LABEL_EXP =
"mapreduce.job.am.node-label-expression";
+
+ /**
+ * Node Label expression applicable for map containers.
+ */
+ public static final String MAP_NODE_LABEL_EXP =
"mapreduce.map.node-label-expression";
+
+ /**
+ * Node Label expression applicable for reduce containers.
+ */
+ public static final String REDUCE_NODE_LABEL_EXP =
"mapreduce.reduce.node-label-expression";
+
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index fe29c03..1b109f8 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1567,6 +1567,41 @@
<!-- MR YARN Application properties -->
<property>
+ <name>mapreduce.job.node-label-expression</name>
+ <description>All the containers of the Map Reduce job will be run with this
+ node label expression. If the node-label-expression for job is not set, then
+ it will use queue's default-node-label-expression for all job's containers.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.job.am.node-label-expression</name>
+ <description>This is node-label configuration for Map Reduce Application
Master
+ container. If not configured it will make use of
+ mapreduce.job.node-label-expression and if job's node-label expression is not
+ configured then it will use queue's default-node-label-expression.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.map.node-label-expression</name>
+ <description>This is node-label configuration for Map task containers. If not
+ configured it will use mapreduce.job.node-label-expression and if job's
+ node-label expression is not configured then it will use queue's
+ default-node-label-expression.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.reduce.node-label-expression</name>
+ <description>This is node-label configuration for Reduce task containers. If
+ not configured it will use mapreduce.job.node-label-expression and if job's
+ node-label expression is not configured then it will use queue's
+ default-node-label-expression.
+ </description>
+</property>
+
+<property>
<name>mapreduce.job.counters.limit</name>
<value>120</value>
<description>Limit on the number of user counters allowed per job.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 8e57607..2bb2483 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,8 +76,10 @@ import
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
- private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
+ private final static RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ public final static Priority AM_CONTAINER_PRIORITY = recordFactory
+ .newRecordInstance(Priority.class);
+ static {
+ AM_CONTAINER_PRIORITY.setPriority(0);
+ }
+
private ResourceMgrDelegate resMgrDelegate;
private ClientCache clientCache;
private Configuration conf;
@@ -525,6 +535,24 @@ public class YARNRunner implements ClientProtocol {
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);
+
+ // set labels for the AM container request if present
+ String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
+ if (null != amNodelabelExpression
+ && amNodelabelExpression.trim().length() != 0) {
+ ResourceRequest amResourceRequest =
+ recordFactory.newRecordInstance(ResourceRequest.class);
+ amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
+ amResourceRequest.setResourceName(ResourceRequest.ANY);
+ amResourceRequest.setCapability(capability);
+ amResourceRequest.setNumContainers(1);
+ amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
+ appContext.setAMContainerResourceRequest(amResourceRequest);
+ }
+ // set labels for the Job containers
+ appContext.setNodeLabelExpression(jobConf
+ .get(JobContext.JOB_NODE_LABEL_EXP));
+
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd0874d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index c427975..0e53ab0 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -547,6 +547,22 @@ public class TestYARNRunner extends TestCase {
}
@Test
+ public void testNodeLabelExp() throws Exception {
+ JobConf jobConf = new JobConf();
+
+ jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
+ jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");
+
+ YARNRunner yarnRunner = new YARNRunner(jobConf);
+ ApplicationSubmissionContext appSubCtx =
+ buildSubmitContext(yarnRunner, jobConf);
+
+ assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
+ assertEquals(appSubCtx.getAMContainerResourceRequest()
+ .getNodeLabelExpression(), "highMem");
+ }
+
+ @Test
public void testAMStandardEnv() throws Exception {
final String ADMIN_LIB_PATH = "foo";
final String USER_LIB_PATH = "bar";