Author: sandy
Date: Mon Jan 13 20:51:24 2014
New Revision: 1557845

URL: http://svn.apache.org/r1557845
Log:
MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group (Ted 
Malaska via Sandy Ryza)

Added:
    
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
Removed:
    
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SimpleGroupsMapping.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
    
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
    
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jan 13 20:51:24 2014
@@ -55,6 +55,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners 
     (tucu)
 
+    MAPREDUCE-5712. Backport Fair Scheduler pool placement by secondary group
+    (Ted Malaska via Sandy Ryza)
+
   BUG FIXES
 
     HADOOP-9863. Backport HADOOP-8686 to support BigEndian on ppc64. 

Modified: 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementPolicy.java
 Mon Jan 13 20:51:24 2014
@@ -39,6 +39,7 @@ public class PoolPlacementPolicy {
         new HashMap<String, Class<? extends PoolPlacementRule>>();
     map.put("user", PoolPlacementRule.User.class);
     map.put("primaryGroup", PoolPlacementRule.PrimaryGroup.class);
+    map.put("secondaryGroupExistingPool", 
PoolPlacementRule.SecondaryGroupExistingPool.class);
     map.put("specified", PoolPlacementRule.Specified.class);
     map.put("default", PoolPlacementRule.Default.class);
     map.put("reject", PoolPlacementRule.Reject.class);

Modified: 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolPlacementRule.java
 Mon Jan 13 20:51:24 2014
@@ -58,7 +58,7 @@ public abstract class PoolPlacementRule 
    */
   public String assignJobToPool(String requestedPool, String user,
       Groups groups, Collection<String> configuredPools) throws IOException {
-    String pool = getPoolForJob(requestedPool, user, groups);
+    String pool = getPoolForJob(requestedPool, user, groups, configuredPools);
     if (create || configuredPools.contains(pool)) {
       return pool;
     } else {
@@ -98,12 +98,14 @@ public abstract class PoolPlacementRule 
    *    The user submitting the job.
    * @param groups
    *    The groups of the user submitting the job.
+   * @param configuredPools
+   *    The pools specified in the scheduler configuration.
    * @return
    *    The name of the Pool to assign the job to, or null to empty string
    *    continue to the next rule.
    */
   protected abstract String getPoolForJob(String requestedPool, String user,
-      Groups groups) throws IOException;
+      Groups groups, Collection<String> configuredQueues) throws IOException;
 
   /**
    * Places jobs in pools by username of the submitter
@@ -111,7 +113,7 @@ public abstract class PoolPlacementRule 
   public static class User extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) {
+        String user, Groups groups, Collection<String> configuredPools) {
       if (user != null) {
         return user; 
       } else {
@@ -131,7 +133,8 @@ public abstract class PoolPlacementRule 
   public static class PrimaryGroup extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) throws IOException {
+        String user, Groups groups, 
+        Collection<String> configuredPools) throws IOException {
       if (user == null) {
         return Pool.DEFAULT_POOL_NAME;
       }
@@ -150,13 +153,41 @@ public abstract class PoolPlacementRule 
     }
   }
 
+  
+  /**
+   * Places jobs in queues by secondary group of the submitter
+   * 
+   * Match will be made on first secondary group that exist in
+   * pool
+   */
+  public static class SecondaryGroupExistingPool extends PoolPlacementRule {
+    @Override
+    protected String getPoolForJob(String requestedPool,
+        String user, Groups groups, 
+        Collection<String> configuredPools) throws IOException {
+      List<String> groupNames = groups.getGroups(user);
+
+      for (int i = 1; i < groupNames.size(); i++) {
+        if (configuredPools.contains(groupNames.get(i))) {
+          return groupNames.get(i);
+        }
+      }
+      return "";
+    }
+        
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+
   /**
    * Places jobs in pools by requested pool of the submitter
    */
   public static class Specified extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool,
-        String user, Groups groups) {
+        String user, Groups groups, Collection<String> configuredPools) {
       if (requestedPool.equals(Pool.DEFAULT_POOL_NAME)) {
         return "";
       } else {
@@ -176,7 +207,7 @@ public abstract class PoolPlacementRule 
   public static class Default extends PoolPlacementRule {
     @Override
     protected String getPoolForJob(String requestedPool, String user,
-        Groups groups) {
+        Groups groups, Collection<String> configuredPools) {
       return Pool.DEFAULT_POOL_NAME;
     }
     
@@ -198,7 +229,7 @@ public abstract class PoolPlacementRule 
     
     @Override
     protected String getPoolForJob(String requestedPool, String user,
-        Groups groups) {
+        Groups groups, Collection<String> configuredPools) {
       throw new UnsupportedOperationException();
     }
     

Added: 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java?rev=1557845&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/SimpleGroupsMapping.java
 Mon Jan 13 20:51:24 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class SimpleGroupsMapping implements GroupMappingServiceProvider {
+  
+  @Override
+  public List<String> getGroups(String user) {
+    return Arrays.asList(user + "group", user + "subgroup1", 
+      user + "subgroup2");
+  }
+
+  @Override
+  public void cacheGroupsRefresh() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cacheGroupsAdd(List<String> groups) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+}

Modified: 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1557845&r1=1557844&r2=1557845&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
 Mon Jan 13 20:51:24 2014
@@ -601,24 +601,33 @@ public class TestFairScheduler extends T
   
   private JobInProgress submitJobNotInitialized(int state, int maps, int 
reduces)
            throws IOException {
-    return submitJob(state, maps, reduces, null, null, false);
+    return submitJob(state, maps, reduces, null, null, false, null);
   }
 
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null, null, true);
+    return submitJob(state, maps, reduces, null, null, true, null);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String 
pool)
       throws IOException {
-    return submitJob(state, maps, reduces, pool, null, true);
+    return submitJob(state, maps, reduces, pool, null, true, null);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String 
pool,
       String[][] mapInputLocations, boolean initializeJob) throws IOException {
+    return submitJob(state, maps, reduces, pool, mapInputLocations, 
initializeJob, null);
+  }
+
+  private JobInProgress submitJob(int state, int maps, int reduces, String 
pool,
+      String[][] mapInputLocations, boolean initializeJob, 
+      String userName) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
+    if (userName != null) {
+      jobConf.set("user.name", userName);
+    }
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
@@ -3098,10 +3107,14 @@ public class TestFairScheduler extends T
     rules.add(new PoolPlacementRule.Specified().initialize(true, null));
     rules.add(new PoolPlacementRule.User().initialize(false, null));
     rules.add(new PoolPlacementRule.PrimaryGroup().initialize(false, null));
+    rules.add(new 
PoolPlacementRule.SecondaryGroupExistingPool().initialize(false, null));
     rules.add(new PoolPlacementRule.Default().initialize(true, null));
     Set<String> pools = new HashSet();
     pools.add("user1");
     pools.add("user3group");
+    pools.add("user4subgroup1");
+    pools.add("user4subgroup2");
+    pools.add("user5subgroup2");
 
     placementPolicyConfig.set("user.name", "user1");
     PoolManager poolManager = scheduler.getPoolManager();
@@ -3109,19 +3122,25 @@ public class TestFairScheduler extends T
     poolManager.placementPolicy = new PoolPlacementPolicy(
         rules, pools, placementPolicyConfig);
     
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
-
-    job1.getJobConf().set("user.name", "user1");
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, 
"user1");
     poolManager.setPool(job1, "somepool");
     assertEquals("somepool", poolManager.getPoolName(job1));
 
     poolManager.setPool(job1, "default");
     assertEquals("user1", poolManager.getPoolName(job1));
 
-    job1.getJobConf().set("user.name", "user3");
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user3");
     poolManager.setPool(job1, "default");
     assertEquals("user3group", poolManager.getPoolName(job1));
 
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user4");
+    poolManager.setPool(job1, "default");
+    assertEquals("user4subgroup1", poolManager.getPoolName(job1));
+
+    job1 = submitJob(JobStatus.RUNNING, 2, 1, null, null, false, "user5");
+    poolManager.setPool(job1, "default");
+    assertEquals("user5subgroup2", poolManager.getPoolName(job1));
+
     job1.getJobConf().set("user.name", "otheruser");
     poolManager.setPool(job1, "default");
     assertEquals("default", poolManager.getPoolName(job1));


Reply via email to