HIVE-18003 : add explicit jdbc connection string args for mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e120bd8b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e120bd8b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e120bd8b Branch: refs/heads/standalone-metastore Commit: e120bd8b0b8b74516651f3ae9e4e7d3a170b0d4d Parents: 89dbf4e Author: sergey <[email protected]> Authored: Thu Dec 14 15:55:25 2017 -0800 Committer: sergey <[email protected]> Committed: Thu Dec 14 15:55:25 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 +- .../org/apache/hive/jdbc/HiveConnection.java | 5 + jdbc/src/java/org/apache/hive/jdbc/Utils.java | 1 + .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 10 +- .../hive/ql/exec/tez/UserPoolMapping.java | 38 +++++- .../hive/ql/exec/tez/WorkloadManager.java | 30 +++-- .../hive/ql/exec/tez/TestWorkloadManager.java | 117 ++++++++++++------- 7 files changed, 147 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7a81612..711dfbd 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2435,9 +2435,13 @@ public class HiveConf extends Configuration { HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "", "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" + "workload management is enabled and used for these sessions."), - HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4, + HIVE_SERVER2_WM_WORKER_THREADS("hive.server2.wm.worker.threads", 4, "Number of worker threads to use to perform the synchronous operations with Tez\n" + "sessions for workload management (e.g. opening, closing, etc.)"), + HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC("hive.server2.wm.allow.any.pool.via.jdbc", false, + "Applies when a user specifies a target WM pool in the JDBC connection string. If\n" + + "false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" + + "multiple group mappings); if true, the user can specify any existing pool."), HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s", new TimeValidator(TimeUnit.SECONDS), "The timeout for AM registry registration, after which (on attempting to use the\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index fc937e6..45acf13 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -138,6 +138,7 @@ public class HiveConnection implements java.sql.Connection { private TProtocolVersion protocol; private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; private String initFile = null; + private String wmPool = null; private Properties clientInfo; /** @@ -178,6 +179,7 @@ public class HiveConnection implements java.sql.Connection { if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) { initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE); } + wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL); // add supported protocols supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); @@ -680,6 +682,9 @@ public class HiveConnection implements java.sql.Connection { // set the fetchSize openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", Integer.toString(fetchSize)); + if (wmPool != null) { + openConf.put("set:hivevar:wmpool", wmPool); + } // set the session configuration Map<String, String> sessVars = connParams.getSessionVars(); http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/Utils.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 855de88..bb13682 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -123,6 +123,7 @@ public class Utils { // Set the fetchSize static final String FETCH_SIZE = "fetchSize"; static final String INIT_FILE = "initFile"; + static final String WM_POOL = "wmPool"; // --------------- Begin 2 way ssl options ------------------------- // Use two way ssl. This param will take effect only when ssl=true http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 27799a8..1a24b44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -156,9 +156,13 @@ public class TezTask extends Task<TezWork> { // based on Hadoop configuration, as documented at // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html String userName = ss.getUserName(); - MappingInput mi = (userName == null) ? new MappingInput("anonymous", null) - : new MappingInput(ss.getUserName(), - UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups()); + List<String> groups = null; + if (userName == null) { + userName = "anonymous"; + } else { + groups = UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups(); + } + MappingInput mi = new MappingInput(userName, groups, ss.getHiveVariables().get("wmpool")); WmContext wmContext = ctx.getWmContext(); // jobConf will hold all the configuration for hadoop, tez, and hive http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java index 33ee8f7..5919f3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import com.google.common.annotations.VisibleForTesting; + +import java.util.Set; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,13 +57,25 @@ class UserPoolMapping { /** Contains all the information necessary to map a query to a pool. */ public static final class MappingInput { - private final String userName; + private final String userName, wmPool; private final List<String> groups; // TODO: we may add app name, etc. later - public MappingInput(String userName, List<String> groups) { + public MappingInput(String userName, List<String> groups, String wmPool) { this.userName = userName; this.groups = groups; + this.wmPool = wmPool; + } + + // TODO: move these into tests when there are fewer conflicting patches pending. + @VisibleForTesting + public MappingInput(String userName) { + this(userName, null); + } + + @VisibleForTesting + public MappingInput(String userName, List<String> groups) { + this(userName, groups, null); } public List<String> getGroups() { @@ -73,7 +88,7 @@ class UserPoolMapping { @Override public String toString() { - return getUserName() + "; groups " + groups; + return "{" + getUserName() + "; pool " + wmPool + "; groups " + groups + "}"; } } @@ -107,17 +122,32 @@ class UserPoolMapping { } } - public String mapSessionToPoolName(MappingInput input) { + public String mapSessionToPoolName(MappingInput input, boolean allowAnyPool, Set<String> pools) { + if (allowAnyPool && input.wmPool != null) { + return (pools == null || pools.contains(input.wmPool)) ? input.wmPool : null; + } // For equal-priority rules, user rules come first because they are more specific (arbitrary). Mapping mapping = userMappings.get(input.getUserName()); + boolean isExplicitMatch = false; + if (mapping != null) { + isExplicitMatch = isExplicitPoolMatch(input, mapping); + if (isExplicitMatch) return mapping.fullPoolName; + } for (String group : input.getGroups()) { Mapping candidate = groupMappings.get(group); if (candidate == null) continue; + isExplicitMatch = isExplicitPoolMatch(input, candidate); + if (isExplicitMatch) return candidate.fullPoolName; if (mapping == null || candidate.priority < mapping.priority) { mapping = candidate; } } + if (input.wmPool != null && !isExplicitMatch) return null; if (mapping != null) return mapping.fullPoolName; return defaultPoolPath; } + + private static boolean isExplicitPoolMatch(MappingInput input, Mapping mapping) { + return input.wmPool != null && input.wmPool.equals(mapping.fullPoolName); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 1f4843d..f0481fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -103,6 +103,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private final QueryAllocationManager allocationManager; private final String yarnQueue; private final int amRegistryTimeoutMs; + private final boolean allowAnyPool; // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool // sessions, so the pool itself could internally track the sessions it gave out, since // calling close on an unopened session is probably harmless. @@ -204,11 +205,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); - workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build()); + workPool = Executors.newFixedThreadPool(HiveConf.getIntVar( + conf, ConfVars.HIVE_SERVER2_WM_WORKER_THREADS), new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Workload management worker %d").build()); - timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Workload management timeout thread").build()); + timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Workload management timeout thread").build()); + + allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC); wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); @@ -1047,7 +1051,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private void queueGetRequestOnMasterThread( GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) { - String poolName = userPoolMapping.mapSessionToPoolName(req.mappingInput); + String poolName = userPoolMapping.mapSessionToPoolName( + req.mappingInput, allowAnyPool, allowAnyPool ? pools.keySet() : null); if (poolName == null) { req.future.setException(new NoPoolMappingException( "Cannot find any pool mapping for " + req.mappingInput)); @@ -1319,8 +1324,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + @VisibleForTesting public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { + TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + return getSession(session, input, conf, null); + } + + public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf, + final WmContext wmContext) throws Exception { WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET); // Note: not actually used for pool sessions; verify some things like doAs are not set. validateConfig(conf); @@ -1936,8 +1947,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida boolean isManaged(MappingInput input) { // This is always replaced atomically, so we don't care about concurrency here. - if (userPoolMapping != null) { - String mappedPool = userPoolMapping.mapSessionToPoolName(input); + UserPoolMapping mapping = userPoolMapping; + if (mapping != null) { + // Don't pass in the pool set - not thread safe; if the user is trying to force us to + // use a non-existent pool, we want to fail anyway. We will fail later during get. + String mappedPool = mapping.mapSessionToPoolName(input, allowAnyPool, null); LOG.info("Mapping input: {} mapped to pool: {}", input, mappedPool); return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index fc8f66a..98f5c58 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -32,7 +32,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; import java.lang.Thread.State; @@ -86,7 +85,7 @@ public class TestWorkloadManager { cdl.countDown(); } try { - session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null)); + session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf)); } catch (Throwable e) { error.compareAndSet(null, e); } @@ -227,17 +226,17 @@ public class TestWorkloadManager { TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null); + TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, new MappingInput("user", null), conf, null); + session = wm.getSession(diffPool, new MappingInput("user"), conf); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null); + TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf); assertSame(session, session2); } @@ -249,11 +248,11 @@ public class TestWorkloadManager { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null); + TezSessionState session = wm.getSession(null, new MappingInput("user"), conf); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, new MappingInput("user", null), conf, null); + session = wm.getSession(session, new MappingInput("user"), conf); assertEquals("test", session.getQueueName()); } @@ -269,7 +268,7 @@ public class TestWorkloadManager { WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); WmTezSession session = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf, null); + null, new MappingInput("user"), conf); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); WmTezSession session2 = (WmTezSession) session.reopen(); @@ -287,10 +286,10 @@ public class TestWorkloadManager { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); + WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); + WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -301,7 +300,7 @@ public class TestWorkloadManager { qam.assertWasCalledAndReset(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null); + session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); @@ -322,20 +321,20 @@ public class TestWorkloadManager { assertEquals(5, wm.getNumSessions()); // Get all the 5 sessions; validate cluster fractions. WmTezSession session05of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p1", null), conf, null); + null, new MappingInput("p1"), conf); assertEquals(0.3, session05of06.getClusterFraction(), EPSILON); WmTezSession session03of06 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf, null); + null, new MappingInput("p2"), conf); assertEquals(0.18, session03of06.getClusterFraction(), EPSILON); WmTezSession session03of06_2 = (WmTezSession) wm.getSession( - null, new MappingInput("p2", null), conf, null); + null, new MappingInput("p2"), conf); assertEquals(0.09, session03of06.getClusterFraction(), EPSILON); assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON); WmTezSession session02of06 = (WmTezSession) wm.getSession( - null,new MappingInput("r1", null), conf, null); + null,new MappingInput("r1"), conf); assertEquals(0.12, session02of06.getClusterFraction(), EPSILON); WmTezSession session04 = (WmTezSession) wm.getSession( - null, new MappingInput("r2", null), conf, null); + null, new MappingInput("r2"), conf); assertEquals(0.4, session04.getClusterFraction(), EPSILON); session05of06.returnToSessionManager(); session03of06.returnToSessionManager(); @@ -347,6 +346,7 @@ public class TestWorkloadManager { @Test(timeout = 10000) public void testMappings() throws Exception { HiveConf conf = createConf(); + conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "false"); MockQam qam = new MockQam(); WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2"))); @@ -363,6 +363,31 @@ public class TestWorkloadManager { verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0"); verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1"); verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0"); + // Check explicit pool specifications - valid cases where priority is changed. + verifyMapping(wm, conf, new MappingInput("u0", groups("g1"), "g1"), "g1"); + verifyMapping(wm, conf, new MappingInput("u2", groups("g1"), "u2"), "u2"); + verifyMapping(wm, conf, new MappingInput("zzz", groups("g0", "g1"), "g1"), "g1"); + // Explicit pool specification - invalid - there's no mapping that matches. + try { + TezSessionState r = wm.getSession( + null, new MappingInput("u0", groups("g0", "g1"), "u2"), conf); + fail("Expected failure, but got " + r); + } catch (Exception ex) { + // Expected. + } + // Now allow the users to specify any pools. + conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "true"); + wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + verifyMapping(wm, conf, new MappingInput("u0", groups("g0", "g1"), "u2"), "u2"); + // The mapping that doesn't exist still shouldn't work. + try { + TezSessionState r = wm.getSession( + null, new MappingInput("u0", groups("g0", "g1"), "zzz"), conf); + fail("Expected failure, but got " + r); + } catch (Exception ex) { + // Expected. + } } private static void verifyMapping( @@ -372,6 +397,9 @@ public class TestWorkloadManager { session.returnToSessionManager(); } + + + @Test(timeout=10000) public void testQueueing() throws Exception { final HiveConf conf = createConf(); @@ -381,9 +409,9 @@ public class TestWorkloadManager { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf); final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(), sessionA4 = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); @@ -397,7 +425,7 @@ public class TestWorkloadManager { assertNull(sessionA4.get()); checkError(error); // While threads are blocked on A, we should still be able to get and return a B session. - WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null); + WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf); sessionB1.returnToSessionManager(); sessionB2.returnToSessionManager(); assertNull(sessionA3.get()); @@ -425,8 +453,8 @@ public class TestWorkloadManager { plan.getPlan().setDefaultPoolPath("A"); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), - session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + session2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalledAndReset(); @@ -448,19 +476,19 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); WmTezSession session1 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf, null); + null, new MappingInput("user"), conf); // First, try to reuse from the same pool - should "just work". WmTezSession session1a = (WmTezSession) wm.getSession( - session1, new MappingInput("user", null), conf, null); + session1, new MappingInput("user"), conf); assertSame(session1, session1a); assertEquals(1.0, session1.getClusterFraction(), EPSILON); // Should still be able to get the 2nd session. WmTezSession session2 = (WmTezSession) wm.getSession( - null, new MappingInput("user", null), conf, null); + null, new MappingInput("user"), conf); // Now try to reuse with no other sessions remaining. Should still work. WmTezSession session2a = (WmTezSession) wm.getSession( - session2, new MappingInput("user", null), conf, null); + session2, new MappingInput("user"), conf); assertSame(session2, session2a); assertEquals(0.5, session1.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); @@ -517,19 +545,19 @@ public class TestWorkloadManager { plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), - sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); assertEquals("A", sessionA2.getPoolName()); assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf); assertSame(sessionA1, sessionB1); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. // Make sure that we can still get a session from A. - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); assertEquals("A", sessionA3.getPoolName()); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); @@ -549,7 +577,7 @@ public class TestWorkloadManager { wm.start(); // One session will be running, the other will be queued in "A" - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf); assertEquals("A", sessionA1.getPoolName()); assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON); final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(); @@ -574,7 +602,7 @@ public class TestWorkloadManager { assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON); // The new session will also go to B now. sessionA2.get().returnToSessionManager(); - WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf); assertEquals("B", sessionB1.getPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); sessionA1.returnToSessionManager(); @@ -598,11 +626,11 @@ public class TestWorkloadManager { // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued. // Total: 5/6 running. - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null), - sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), - sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null), - sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null), - sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf), + sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf), + sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf), + sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf); final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(), sessionD2 = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); @@ -738,7 +766,7 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); // [A: 1, B: 0] Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -762,7 +790,7 @@ public class TestWorkloadManager { assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); // [A: 1, B: 1] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -789,7 +817,7 @@ public class TestWorkloadManager { assertEquals("B", sessionA2.getPoolName()); assertEquals("B", sessionA1.getPoolName()); - WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); // [A: 1, B: 2] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -829,7 +857,7 @@ public class TestWorkloadManager { final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); - WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders(); @@ -887,7 +915,8 @@ public class TestWorkloadManager { assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); assertEquals("B.x", sessionA1.getPoolName()); - WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0] allSessionProviders = wm.getAllSessionTriggerProviders(); assertEquals(1, allSessionProviders.get("A").getSessions().size()); @@ -986,7 +1015,7 @@ public class TestWorkloadManager { failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); try { - TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null); + TezSessionState r = wm.getSession(null, new MappingInput("A"), conf); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected. @@ -1037,7 +1066,7 @@ public class TestWorkloadManager { assertEquals(0f, oldSession.getClusterFraction(), EPSILON); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. - WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null); + WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); assertEquals(sessionPoolName, result.getPoolName()); assertEquals(1f, result.getClusterFraction(), EPSILON); result.returnToSessionManager();
