Author: sandy
Date: Mon Jan 13 20:51:24 2014
New Revision: 1557845
URL: http://svn.apache.org/r1557845
Log:
MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group (Ted
Malaska via Sandy Ryza)
Added:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
Removed:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SimpleGroupsMapping.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jan 13 20:51:24 2014
@@ -55,6 +55,9 @@ Release 1.3.0 - unreleased
MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners
(tucu)
+ MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group
+ (Ted Malaska via Sandy Ryza)
+
BUG FIXES
HADOOP-9863. Backport HADOOP-8686 to support BigEndian on ppc64.
Modified:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
(original)
+++
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
Mon Jan 13 20:51:24 2014
@@ -39,6 +39,7 @@ public class PoolPlacementPolicy {
new HashMap<String, Class<? extends PoolPlacementRule>>();
map.put("user", PoolPlacementRule.User.class);
map.put("primaryGroup", PoolPlacementRule.PrimaryGroup.class);
+ map.put("secondaryGroupExistingPool",
PoolPlacementRule.SecondaryGroupExistingPool.class);
map.put("specified", PoolPlacementRule.Specified.class);
map.put("default", PoolPlacementRule.Default.class);
map.put("reject", PoolPlacementRule.Reject.class);
Modified:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
(original)
+++
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
Mon Jan 13 20:51:24 2014
@@ -58,7 +58,7 @@ public abstract class PoolPlacementRule
*/
public String assignJobToPool(String requestedPool, String user,
Groups groups, Collection<String> configuredPools) throws IOException {
- String pool = getPoolForJob(requestedPool, user, groups);
+ String pool = getPoolForJob(requestedPool, user, groups, configuredPools);
if (create || configuredPools.contains(pool)) {
return pool;
} else {
@@ -98,12 +98,14 @@ public abstract class PoolPlacementRule
* The user submitting the job.
* @param groups
* The groups of the user submitting the job.
+ * @param configuredPools
+ * The pools specified in the scheduler configuration.
* @return
* The name of the Pool to assign the job to, or null to empty string
* continue to the next rule.
*/
protected abstract String getPoolForJob(String requestedPool, String user,
- Groups groups) throws IOException;
+ Groups groups, Collection<String> configuredQueues) throws IOException;
/**
* Places jobs in pools by username of the submitter
@@ -111,7 +113,7 @@ public abstract class PoolPlacementRule
public static class User extends PoolPlacementRule {
@Override
protected String getPoolForJob(String requestedPool,
- String user, Groups groups) {
+ String user, Groups groups, Collection<String> configuredPools) {
if (user != null) {
return user;
} else {
@@ -131,7 +133,8 @@ public abstract class PoolPlacementRule
public static class PrimaryGroup extends PoolPlacementRule {
@Override
protected String getPoolForJob(String requestedPool,
- String user, Groups groups) throws IOException {
+ String user, Groups groups,
+ Collection<String> configuredPools) throws IOException {
if (user == null) {
return Pool.DEFAULT_POOL_NAME;
}
@@ -150,13 +153,41 @@ public abstract class PoolPlacementRule
}
}
+
+ /**
+ * Places jobs in queues by secondary group of the submitter
+ *
+ * Match will be made on first secondary group that exist in
+ * pool
+ */
+ public static class SecondaryGroupExistingPool extends PoolPlacementRule {
+ @Override
+ protected String getPoolForJob(String requestedPool,
+ String user, Groups groups,
+ Collection<String> configuredPools) throws IOException {
+ List<String> groupNames = groups.getGroups(user);
+
+ for (int i = 1; i < groupNames.size(); i++) {
+ if (configuredPools.contains(groupNames.get(i))) {
+ return groupNames.get(i);
+ }
+ }
+ return "";
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
/**
* Places jobs in pools by requested pool of the submitter
*/
public static class Specified extends PoolPlacementRule {
@Override
protected String getPoolForJob(String requestedPool,
- String user, Groups groups) {
+ String user, Groups groups, Collection<String> configuredPools) {
if (requestedPool.equals(Pool.DEFAULT_POOL_NAME)) {
return "";
} else {
@@ -176,7 +207,7 @@ public abstract class PoolPlacementRule
public static class Default extends PoolPlacementRule {
@Override
protected String getPoolForJob(String requestedPool, String user,
- Groups groups) {
+ Groups groups, Collection<String> configuredPools) {
return Pool.DEFAULT_POOL_NAME;
}
@@ -198,7 +229,7 @@ public abstract class PoolPlacementRule
@Override
protected String getPoolForJob(String requestedPool, String user,
- Groups groups) {
+ Groups groups, Collection<String> configuredPools) {
throw new UnsupportedOperationException();
}
Added:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java?rev=1557845&view=auto
==============================================================================
---
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
(added)
+++
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
Mon Jan 13 20:51:24 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class SimpleGroupsMapping implements GroupMappingServiceProvider {
+
+ @Override
+ public List<String> getGroups(String user) {
+ return Arrays.asList(user + "group", user + "subgroup1",
+ user + "subgroup2");
+ }
+
+ @Override
+ public void cacheGroupsRefresh() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cacheGroupsAdd(List<String> groups) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+}
Modified:
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Mon Jan 13 20:51:24 2014
@@ -601,24 +601,33 @@ public class TestFairScheduler extends T
private JobInProgress submitJobNotInitialized(int state, int maps, int
reduces)
throws IOException {
- return submitJob(state, maps, reduces, null, null, false);
+ return submitJob(state, maps, reduces, null, null, false, null);
}
private JobInProgress submitJob(int state, int maps, int reduces)
throws IOException {
- return submitJob(state, maps, reduces, null, null, true);
+ return submitJob(state, maps, reduces, null, null, true, null);
}
private JobInProgress submitJob(int state, int maps, int reduces, String
pool)
throws IOException {
- return submitJob(state, maps, reduces, pool, null, true);
+ return submitJob(state, maps, reduces, pool, null, true, null);
}
private JobInProgress submitJob(int state, int maps, int reduces, String
pool,
String[][] mapInputLocations, boolean initializeJob) throws IOException {
+ return submitJob(state, maps, reduces, pool, mapInputLocations,
initializeJob, null);
+ }
+
+ private JobInProgress submitJob(int state, int maps, int reduces, String
pool,
+ String[][] mapInputLocations, boolean initializeJob,
+ String userName) throws IOException {
JobConf jobConf = new JobConf(conf);
jobConf.setNumMapTasks(maps);
jobConf.setNumReduceTasks(reduces);
+ if (userName != null) {
+ jobConf.set("user.name", userName);
+ }
if (pool != null)
jobConf.set(POOL_PROPERTY, pool);
JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
@@ -3098,10 +3107,14 @@ public class TestFairScheduler extends T
rules.add(new PoolPlacementRule.Specified().initialize(true, null));
rules.add(new PoolPlacementRule.User().initialize(false, null));
rules.add(new PoolPlacementRule.PrimaryGroup().initialize(false, null));
+ rules.add(new
PoolPlacementRule.SecondaryGroupExistingPool().initialize(false, null));
rules.add(new PoolPlacementRule.Default().initialize(true, null));
Set<String> pools = new HashSet();
pools.add("user1");
pools.add("user3group");
+ pools.add("user4subgroup1");
+ pools.add("user4subgroup2");
+ pools.add("user5subgroup2");
placementPolicyConfig.set("user.name", "user1");
PoolManager poolManager = scheduler.getPoolManager();
@@ -3109,19 +3122,25 @@ public class TestFairScheduler extends T
poolManager.placementPolicy = new PoolPlacementPolicy(
rules, pools, placementPolicyConfig);
- JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
-
- job1.getJobConf().set("user.name", "user1");
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false,
"user1");
poolManager.setPool(job1, "somepool");
assertEquals("somepool", poolManager.getPoolName(job1));
poolManager.setPool(job1, "default");
assertEquals("user1", poolManager.getPoolName(job1));
- job1.getJobConf().set("user.name", "user3");
+ job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user3");
poolManager.setPool(job1, "default");
assertEquals("user3group", poolManager.getPoolName(job1));
+ job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user4");
+ poolManager.setPool(job1, "default");
+ assertEquals("user4subgroup1", poolManager.getPoolName(job1));
+
+ job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user5");
+ poolManager.setPool(job1, "default");
+ assertEquals("user5subgroup2", poolManager.getPoolName(job1));
+
job1.getJobConf().set("user.name", "otheruser");
poolManager.setPool(job1, "default");
assertEquals("default", poolManager.getPoolName(job1));