HIVE-18003 : add explicit jdbc connection string args for mappings (Sergey 
Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e120bd8b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e120bd8b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e120bd8b

Branch: refs/heads/standalone-metastore
Commit: e120bd8b0b8b74516651f3ae9e4e7d3a170b0d4d
Parents: 89dbf4e
Author: sergey <[email protected]>
Authored: Thu Dec 14 15:55:25 2017 -0800
Committer: sergey <[email protected]>
Committed: Thu Dec 14 15:55:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +-
 .../org/apache/hive/jdbc/HiveConnection.java    |   5 +
 jdbc/src/java/org/apache/hive/jdbc/Utils.java   |   1 +
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  10 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |  38 +++++-
 .../hive/ql/exec/tez/WorkloadManager.java       |  30 +++--
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 117 ++++++++++++-------
 7 files changed, 147 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a81612..711dfbd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2435,9 +2435,13 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", 
"",
         "A single YARN queues to use for Hive Interactive sessions. When this 
is specified,\n" +
         "workload management is enabled and used for these sessions."),
-    HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+    HIVE_SERVER2_WM_WORKER_THREADS("hive.server2.wm.worker.threads", 4,
         "Number of worker threads to use to perform the synchronous operations 
with Tez\n" +
         "sessions for workload management (e.g. opening, closing, etc.)"),
+    
HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC("hive.server2.wm.allow.any.pool.via.jdbc",
 false,
+        "Applies when a user specifies a target WM pool in the JDBC connection 
string. If\n" +
+        "false, the user can only specify a pool he is mapped to (e.g. make a 
choice among\n" +
+        "multiple group mappings); if true, the user can specify any existing 
pool."),
     
HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout",
 "30s",
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting 
to use the\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index fc937e6..45acf13 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -138,6 +138,7 @@ public class HiveConnection implements java.sql.Connection {
   private TProtocolVersion protocol;
   private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE;
   private String initFile = null;
+  private String wmPool = null;
   private Properties clientInfo;
 
   /**
@@ -178,6 +179,7 @@ public class HiveConnection implements java.sql.Connection {
     if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
       initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
     }
+    wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
 
     // add supported protocols
     supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
@@ -680,6 +682,9 @@ public class HiveConnection implements java.sql.Connection {
     // set the fetchSize
     
openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
       Integer.toString(fetchSize));
+    if (wmPool != null) {
+      openConf.put("set:hivevar:wmpool", wmPool);
+    }
 
     // set the session configuration
     Map<String, String> sessVars = connParams.getSessionVars();

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java 
b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 855de88..bb13682 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -123,6 +123,7 @@ public class Utils {
     // Set the fetchSize
     static final String FETCH_SIZE = "fetchSize";
     static final String INIT_FILE = "initFile";
+    static final String WM_POOL = "wmPool";
 
     // --------------- Begin 2 way ssl options -------------------------
     // Use two way ssl. This param will take effect only when ssl=true

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 27799a8..1a24b44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -156,9 +156,13 @@ public class TezTask extends Task<TezWork> {
       // based on Hadoop configuration, as documented at
       // 
https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
       String userName = ss.getUserName();
-      MappingInput mi = (userName == null) ? new MappingInput("anonymous", 
null)
-        : new MappingInput(ss.getUserName(),
-            
UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+      List<String> groups = null;
+      if (userName == null) {
+        userName = "anonymous";
+      } else {
+        groups = 
UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups();
+      }
+      MappingInput mi = new MappingInput(userName, groups, 
ss.getHiveVariables().get("wmpool"));
 
       WmContext wmContext = ctx.getWmContext();
       // jobConf will hold all the configuration for hadoop, tez, and hive

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 33ee8f7..5919f3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Set;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,13 +57,25 @@ class UserPoolMapping {
 
   /** Contains all the information necessary to map a query to a pool. */
   public static final class MappingInput {
-    private final String userName;
+    private final String userName, wmPool;
     private final List<String> groups;
     // TODO: we may add app name, etc. later
 
-    public MappingInput(String userName, List<String> groups) {
+    public MappingInput(String userName, List<String> groups, String wmPool) {
       this.userName = userName;
       this.groups = groups;
+      this.wmPool = wmPool;
+    }
+
+    // TODO: move these into tests when there are fewer conflicting patches 
pending.
+    @VisibleForTesting
+    public MappingInput(String userName) {
+      this(userName, null);
+    }
+
+    @VisibleForTesting
+    public MappingInput(String userName, List<String> groups) {
+      this(userName, groups, null);
     }
 
     public List<String> getGroups() {
@@ -73,7 +88,7 @@ class UserPoolMapping {
 
     @Override
     public String toString() {
-      return getUserName() + "; groups " + groups;
+      return "{" + getUserName() + "; pool " + wmPool + "; groups " + groups + 
"}";
     }
   }
 
@@ -107,17 +122,32 @@ class UserPoolMapping {
     }
   }
 
-  public String mapSessionToPoolName(MappingInput input) {
+  public String mapSessionToPoolName(MappingInput input, boolean allowAnyPool, 
Set<String> pools) {
+    if (allowAnyPool && input.wmPool != null) {
+      return (pools == null || pools.contains(input.wmPool)) ? input.wmPool : 
null;
+    }
     // For equal-priority rules, user rules come first because they are more 
specific (arbitrary).
     Mapping mapping = userMappings.get(input.getUserName());
+    boolean isExplicitMatch = false;
+    if (mapping != null) {
+      isExplicitMatch = isExplicitPoolMatch(input, mapping);
+      if (isExplicitMatch) return mapping.fullPoolName;
+    }
     for (String group : input.getGroups()) {
       Mapping candidate = groupMappings.get(group);
       if (candidate == null) continue;
+      isExplicitMatch = isExplicitPoolMatch(input, candidate);
+      if (isExplicitMatch) return candidate.fullPoolName;
       if (mapping == null || candidate.priority < mapping.priority) {
         mapping = candidate;
       }
     }
+    if (input.wmPool != null && !isExplicitMatch) return null;
     if (mapping != null) return mapping.fullPoolName;
     return defaultPoolPath;
   }
+
+  private static boolean isExplicitPoolMatch(MappingInput input, Mapping 
mapping) {
+    return input.wmPool != null && input.wmPool.equals(mapping.fullPoolName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 1f4843d..f0481fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -103,6 +103,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   private final QueryAllocationManager allocationManager;
   private final String yarnQueue;
   private final int amRegistryTimeoutMs;
+  private final boolean allowAnyPool;
   // Note: it's not clear that we need to track this - unlike PoolManager we 
don't have non-pool
   //       sessions, so the pool itself could internally track the sessions it 
gave out, since
   //       calling close on an unopened session is probably harmless.
@@ -204,11 +205,14 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     // Only creates the expiration tracker if expiration is configured.
     expirationTracker = SessionExpirationTracker.create(conf, this);
 
-    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS),
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload 
management worker %d").build());
+    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(
+        conf, ConfVars.HIVE_SERVER2_WM_WORKER_THREADS), new 
ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Workload management worker %d").build());
 
-    timeoutPool = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
-      .setNameFormat("Workload management timeout thread").build());
+    timeoutPool = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+      .setDaemon(true).setNameFormat("Workload management timeout 
thread").build());
+
+    allowAnyPool = HiveConf.getBoolVar(conf, 
ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
 
     wmThread = new Thread(() -> runWmThread(), "Workload management master");
     wmThread.setDaemon(true);
@@ -1047,7 +1051,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
 
   private void queueGetRequestOnMasterThread(
       GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork 
syncWork) {
-    String poolName = userPoolMapping.mapSessionToPoolName(req.mappingInput);
+    String poolName = userPoolMapping.mapSessionToPoolName(
+        req.mappingInput, allowAnyPool, allowAnyPool ? pools.keySet() : null);
     if (poolName == null) {
       req.future.setException(new NoPoolMappingException(
           "Cannot find any pool mapping for " + req.mappingInput));
@@ -1319,8 +1324,14 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  @VisibleForTesting
   public WmTezSession getSession(
-    TezSessionState session, MappingInput input, HiveConf conf, final 
WmContext wmContext) throws Exception {
+      TezSessionState session, MappingInput input, HiveConf conf) throws 
Exception {
+    return getSession(session, input, conf, null);
+  }
+
+  public WmTezSession getSession(TezSessionState session, MappingInput input, 
HiveConf conf,
+      final WmContext wmContext) throws Exception {
     WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
     // Note: not actually used for pool sessions; verify some things like doAs 
are not set.
     validateConfig(conf);
@@ -1936,8 +1947,11 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
 
   boolean isManaged(MappingInput input) {
     // This is always replaced atomically, so we don't care about concurrency 
here.
-    if (userPoolMapping != null) {
-      String mappedPool = userPoolMapping.mapSessionToPoolName(input);
+    UserPoolMapping mapping = userPoolMapping;
+    if (mapping != null) {
+      // Don't pass in the pool set - not thread safe; if the user is trying 
to force us to
+      // use a non-existent pool, we want to fail anyway. We will fail later 
during get.
+      String mappedPool = mapping.mapSessionToPoolName(input, allowAnyPool, 
null);
       LOG.info("Mapping input: {} mapped to pool: {}", input, mappedPool);
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index fc8f66a..98f5c58 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import java.lang.Thread.State;
@@ -86,7 +85,7 @@ public class TestWorkloadManager {
         cdl.countDown();
       }
       try {
-       session.set((WmTezSession) wm.getSession(old, new 
MappingInput(userName, null), conf, null));
+       session.set((WmTezSession) wm.getSession(old, new 
MappingInput(userName), conf));
       } catch (Throwable e) {
         error.compareAndSet(null, e);
       }
@@ -227,17 +226,17 @@ public class TestWorkloadManager {
     TezSessionState nonPool = mock(TezSessionState.class);
     when(nonPool.getConf()).thenReturn(conf);
     doNothing().when(nonPool).close(anyBoolean());
-    TezSessionState session = wm.getSession(nonPool, new MappingInput("user", 
null), conf, null);
+    TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), 
conf);
     verify(nonPool).close(anyBoolean());
     assertNotSame(nonPool, session);
     session.returnToSessionManager();
     TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
     when(diffPool.getConf()).thenReturn(conf);
     doNothing().when(diffPool).returnToSessionManager();
-    session = wm.getSession(diffPool, new MappingInput("user", null), conf, 
null);
+    session = wm.getSession(diffPool, new MappingInput("user"), conf);
     verify(diffPool).returnToSessionManager();
     assertNotSame(diffPool, session);
-    TezSessionState session2 = wm.getSession(session, new MappingInput("user", 
null), conf, null);
+    TezSessionState session2 = wm.getSession(session, new 
MappingInput("user"), conf);
     assertSame(session, session2);
   }
 
@@ -249,11 +248,11 @@ public class TestWorkloadManager {
     wm.start();
     // The queue should be ignored.
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
-    TezSessionState session = wm.getSession(null, new MappingInput("user", 
null), conf, null);
+    TezSessionState session = wm.getSession(null, new MappingInput("user"), 
conf);
     assertEquals("test", session.getQueueName());
     assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
     session.setQueueName("test2");
-    session = wm.getSession(session, new MappingInput("user", null), conf, 
null);
+    session = wm.getSession(session, new MappingInput("user"), conf);
     assertEquals("test", session.getQueueName());
   }
 
@@ -269,7 +268,7 @@ public class TestWorkloadManager {
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
     wm.start();
     WmTezSession session = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
     WmTezSession session2 = (WmTezSession) session.reopen();
@@ -287,10 +286,10 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
-    WmTezSession session = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf, null);
+    WmTezSession session = (WmTezSession) wm.getSession(null, new 
MappingInput("user"), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
-    WmTezSession session2 = (WmTezSession) wm.getSession(null, new 
MappingInput("user", null), conf, null);
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, new 
MappingInput("user"), conf);
     assertEquals(0.5, session.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -301,7 +300,7 @@ public class TestWorkloadManager {
     qam.assertWasCalledAndReset();
 
     // We never lose pool session, so we should still be able to get.
-    session = (WmTezSession) wm.getSession(null, new MappingInput("user", 
null), conf, null);
+    session = (WmTezSession) wm.getSession(null, new MappingInput("user"), 
conf);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -322,20 +321,20 @@ public class TestWorkloadManager {
     assertEquals(5, wm.getNumSessions());
     // Get all the 5 sessions; validate cluster fractions.
     WmTezSession session05of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p1", null), conf, null);
+        null, new MappingInput("p1"), conf);
     assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf, null);
+        null, new MappingInput("p2"), conf);
     assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf, null);
+        null, new MappingInput("p2"), conf);
     assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
     assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
     WmTezSession session02of06 = (WmTezSession) wm.getSession(
-        null,new MappingInput("r1", null), conf, null);
+        null,new MappingInput("r1"), conf);
     assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
     WmTezSession session04 = (WmTezSession) wm.getSession(
-        null, new MappingInput("r2", null), conf, null);
+        null, new MappingInput("r2"), conf);
     assertEquals(0.4, session04.getClusterFraction(), EPSILON);
     session05of06.returnToSessionManager();
     session03of06.returnToSessionManager();
@@ -347,6 +346,7 @@ public class TestWorkloadManager {
   @Test(timeout = 10000)
   public void testMappings() throws Exception {
     HiveConf conf = createConf();
+    conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, 
"false");
     MockQam qam = new MockQam();
     WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
         Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
@@ -363,6 +363,31 @@ public class TestWorkloadManager {
     verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
     verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
     verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+    // Check explicit pool specifications - valid cases where priority is 
changed.
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g1"), "g1"), "g1");
+    verifyMapping(wm, conf, new MappingInput("u2", groups("g1"), "u2"), "u2");
+    verifyMapping(wm, conf, new MappingInput("zzz", groups("g0", "g1"), "g1"), 
"g1");
+    // Explicit pool specification - invalid - there's no mapping that matches.
+    try {
+      TezSessionState r = wm.getSession(
+        null, new MappingInput("u0", groups("g0", "g1"), "u2"), conf);
+      fail("Expected failure, but got " + r);
+    } catch (Exception ex) {
+      // Expected.
+    }
+    // Now allow the users to specify any pools.
+    conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "true");
+    wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g0", "g1"), "u2"), 
"u2");
+    // The mapping that doesn't exist still shouldn't work.
+    try {
+      TezSessionState r = wm.getSession(
+        null, new MappingInput("u0", groups("g0", "g1"), "zzz"), conf);
+      fail("Expected failure, but got " + r);
+    } catch (Exception ex) {
+      // Expected.
+    }
   }
 
   private static void verifyMapping(
@@ -372,6 +397,9 @@ public class TestWorkloadManager {
     session.returnToSessionManager();
   }
 
+  
+
+
   @Test(timeout=10000)
   public void testQueueing() throws Exception {
     final HiveConf conf = createConf();
@@ -381,9 +409,9 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), 
conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), 
conf);
     final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
         sessionA4 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -397,7 +425,7 @@ public class TestWorkloadManager {
     assertNull(sessionA4.get());
     checkError(error);
     // While threads are blocked on A, we should still be able to get and 
return a B session.
-    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new 
MappingInput("B", null), conf, null);
+    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new 
MappingInput("B"), conf);
     sessionB1.returnToSessionManager();
     sessionB2.returnToSessionManager();
     assertNull(sessionA3.get());
@@ -425,8 +453,8 @@ public class TestWorkloadManager {
     plan.getPlan().setDefaultPoolPath("A");
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession session1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
-        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null);
+    WmTezSession session1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf),
+        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), 
conf);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -448,19 +476,19 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, 
qam);
     wm.start();
     WmTezSession session1 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
     // First, try to reuse from the same pool - should "just work".
     WmTezSession session1a = (WmTezSession) wm.getSession(
-        session1, new MappingInput("user", null), conf, null);
+        session1, new MappingInput("user"), conf);
     assertSame(session1, session1a);
     assertEquals(1.0, session1.getClusterFraction(), EPSILON);
     // Should still be able to get the 2nd session.
     WmTezSession session2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
 
     // Now try to reuse with no other sessions remaining. Should still work.
     WmTezSession session2a = (WmTezSession) wm.getSession(
-        session2, new MappingInput("user", null), conf, null);
+        session2, new MappingInput("user"), conf);
     assertSame(session2, session2a);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -517,19 +545,19 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", 
null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), 
conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("A", sessionA2.getPoolName());
     assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new 
MappingInput("B", null), conf, null);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new 
MappingInput("B"), conf);
     assertSame(sessionA1, sessionB1);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed 
from A.
     // Make sure that we can still get a session from A.
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
     assertEquals("A", sessionA3.getPoolName());
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -549,7 +577,7 @@ public class TestWorkloadManager {
     wm.start();
  
     // One session will be running, the other will be queued in "A"
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U"), conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -574,7 +602,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
     // The new session will also go to B now.
     sessionA2.get().returnToSessionManager();
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U", null), conf, null);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new 
MappingInput("U"), conf);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     sessionA1.returnToSessionManager();
@@ -598,11 +626,11 @@ public class TestWorkloadManager {
  
     // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 
running, 1 queued.
     // Total: 5/6 running.
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null),
-        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", 
null), conf, null),
-        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", 
null), conf, null),
-        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", 
null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), 
conf),
+        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), 
conf),
+        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), 
conf),
+        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), 
conf);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
         sessionD2 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -738,7 +766,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
 
     // [A: 1, B: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = 
wm.getAllSessionTriggerProviders();
@@ -762,7 +790,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
     // [A: 1, B: 1]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -789,7 +817,7 @@ public class TestWorkloadManager {
     assertEquals("B", sessionA2.getPoolName());
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
     // [A: 1, B: 2]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -829,7 +857,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
 
     // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = 
wm.getAllSessionTriggerProviders();
@@ -887,7 +915,8 @@ public class TestWorkloadManager {
     
assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
     assertEquals("B.x", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
+
     // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -986,7 +1015,7 @@ public class TestWorkloadManager {
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
     try {
-      TezSessionState r = wm.getSession(null, new MappingInput("A", null), 
conf, null);
+      TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
       fail("Expected an error but got " + r);
     } catch (Exception ex) {
       // Expected.
@@ -1037,7 +1066,7 @@ public class TestWorkloadManager {
     assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
     pool.returnSession(theOnlySession);
     // Make sure we can actually get a session still - parallelism/etc. should 
not be affected.
-    WmTezSession result = (WmTezSession) wm.getSession(null, new 
MappingInput("A", null), conf, null);
+    WmTezSession result = (WmTezSession) wm.getSession(null, new 
MappingInput("A"), conf);
     assertEquals(sessionPoolName, result.getPoolName());
     assertEquals(1f, result.getClusterFraction(), EPSILON);
     result.returnToSessionManager();

Reply via email to