Repository: hive Updated Branches: refs/heads/master e3b10c1fc -> f9d51073a
HIVE-15732 : add the ability to restrict configuration for the queries submitted to HS2 (Tez pool) (Sergey Shelukhin, reviewed by Siddharth Seth, Lefty Leverenz) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f9d51073 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f9d51073 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f9d51073 Branch: refs/heads/master Commit: f9d51073ac02248b46c6a4800bc1ad6c7394af04 Parents: e3b10c1 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Jan 30 16:10:40 2017 -0800 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Jan 30 16:10:40 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 38 +++++++- .../hive/ql/exec/tez/TezSessionPoolManager.java | 94 ++++++++++++++++---- 2 files changed, 113 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f9d51073/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 586e693..d19d2ea 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2364,7 +2364,14 @@ public class HiveConf extends Configuration { HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS("hive.server2.tez.sessions.init.threads", 16, "If hive.server2.tez.initialize.default.sessions is enabled, the maximum number of\n" + "threads to use to initialize the default sessions."), - + HIVE_SERVER2_TEZ_SESSION_RESTRICTED_CONFIGS("hive.server2.tez.sessions.restricted.configs", "", + "The configuration settings that cannot be set when submitting jobs to HiveServer2. If\n" + + "any of these are set to values different from those in the server configuration, an\n" + + "exception will be thrown."), + HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED("hive.server2.tez.sessions.custom.queue.allowed", + "true", new StringSet("true", "false", "ignore"), + "Whether Tez session pool should allow submitting queries to custom queues. The options\n" + + "are true, false (error out), ignore (accept the query but ignore the queue setting)."), // Operation log configuration HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, @@ -3331,7 +3338,7 @@ public class HiveConf extends Configuration { "See HIVE-15121 for details."); public final String varname; - private final String altName; + public final String altName; private final String defaultExpr; public final String defaultStrVal; @@ -3855,6 +3862,11 @@ public class HiveConf extends Configuration { : conf.get(var.varname, var.defaultStrVal); } + public static String getVarWithoutType(Configuration conf, ConfVars var) { + return var.altName != null ? conf.get(var.varname, conf.get(var.altName, var.defaultExpr)) + : conf.get(var.varname, var.defaultExpr); + } + public static String getTrimmedVar(Configuration conf, ConfVars var) { assert (var.valClass == String.class) : var.varname; if (var.altName != null) { @@ -4530,4 +4542,26 @@ public class HiveConf extends Configuration { + "Consider using a different execution engine (i.e. " + HiveConf.getNonMrEngines() + ") or using Hive 1.X releases."; } + + private static final Object reverseMapLock = new Object(); + private static HashMap<String, ConfVars> reverseMap = null; + + public static HashMap<String, ConfVars> getOrCreateReverseMap() { + // This should be called rarely enough; for now it's ok to just lock every time. + synchronized (reverseMapLock) { + if (reverseMap != null) return reverseMap; + } + HashMap<String, ConfVars> vars = new HashMap<>(); + for (ConfVars val : ConfVars.values()) { + vars.put(val.varname.toLowerCase(), val); + if (val.altName != null && !val.altName.isEmpty()) { + vars.put(val.altName.toLowerCase(), val); + } + } + synchronized (reverseMapLock) { + if (reverseMap != null) return reverseMap; + reverseMap = vars; + return reverseMap; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f9d51073/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index d7148bb..ecac85c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -67,6 +67,12 @@ import org.apache.tez.dag.api.TezException; */ public class TezSessionPoolManager { + private enum CustomQueueAllowed { + TRUE, + FALSE, + IGNORE + } + private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); private static final Random rdm = new Random(); @@ -80,13 +86,16 @@ public class TezSessionPoolManager { private Thread expirationThread; private Thread restartThread; - private Semaphore llapQueue; private HiveConf initConf = null; // Config settings. private int numConcurrentLlapQueries = -1; private long sessionLifetimeMs = 0; private long sessionLifetimeJitterMs = 0; + private CustomQueueAllowed customQueueAllowed = CustomQueueAllowed.TRUE; + private List<ConfVars> restrictedHiveConf = new ArrayList<>(); + private List<String> restrictedNonHiveConf = new ArrayList<>(); + /** A queue for initial sessions that have not been started yet. */ private Queue<TezSessionPoolSession> initialSessions = new ConcurrentLinkedQueue<TezSessionPoolSession>(); @@ -199,6 +208,31 @@ public class TezSessionPoolManager { llapQueue = new Semaphore(numConcurrentLlapQueries, true); this.initConf = conf; + String queueAllowedStr = HiveConf.getVar(initConf, + ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); + try { + this.customQueueAllowed = CustomQueueAllowed.valueOf(queueAllowedStr.toUpperCase()); + } catch (Exception ex) { + throw new RuntimeException("Invalid value '" + queueAllowedStr + "' for " + + ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname); + } + String[] restrictedConfigs = HiveConf.getTrimmedStringsVar(initConf, + ConfVars.HIVE_SERVER2_TEZ_SESSION_RESTRICTED_CONFIGS); + if (restrictedConfigs != null && restrictedConfigs.length > 0) { + HashMap<String, ConfVars> confVars = HiveConf.getOrCreateReverseMap(); + for (String confName : restrictedConfigs) { + if (confName == null || confName.isEmpty()) continue; + confName = confName.toLowerCase(); + ConfVars cv = confVars.get(confName); + if (cv != null) { + restrictedHiveConf.add(cv); + } else { + LOG.warn("A restricted config " + confName + " is not recognized as a Hive setting."); + restrictedNonHiveConf.add(confName); + } + } + } + sessionLifetimeMs = conf.getTimeVar( ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS); @@ -269,10 +303,33 @@ public class TezSessionPoolManager { return sessionState; } - private TezSessionState getSession(HiveConf conf, boolean doOpen, - boolean forceCreate) + private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Exception { String queueName = conf.get("tez.queue.name"); + boolean hasQueue = (queueName != null) && !queueName.isEmpty(); + if (hasQueue) { + switch (customQueueAllowed) { + case FALSE: throw new HiveException("Specifying tez.queue.name is not allowed"); + case IGNORE: { + LOG.warn("User has specified " + queueName + " queue; ignoring the setting"); + queueName = null; + hasQueue = false; + conf.set("tez.queue.name", null); + } + default: // All good. + } + } + for (ConfVars var : restrictedHiveConf) { + String userValue = HiveConf.getVarWithoutType(conf, var), + serverValue = HiveConf.getVarWithoutType(initConf, var); + // Note: with some trickery, we could add logic for each type in ConfVars; for now the + // potential spurious mismatches (e.g. 0 and 0.0 for float) should be easy to work around. + validateRestrictedConfigValues(var.varname, userValue, serverValue); + } + for (String var : restrictedNonHiveConf) { + String userValue = conf.get(var), serverValue = initConf.get(var); + validateRestrictedConfigValues(var, userValue, serverValue); + } // TODO Session re-use completely disabled for doAs=true. Always launches a new session. boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); @@ -283,11 +340,9 @@ public class TezSessionPoolManager { * their own credentials. We expect that with the new security model, things will * run as user hive in most cases. */ - if (forceCreate || nonDefaultUser || !hasInitialSessions - || ((queueName != null) && !queueName.isEmpty())) { - LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} hasInitialSessions: {}" + - " forceCreate: {}", queueName, nonDefaultUser, defaultQueuePool, hasInitialSessions, - forceCreate); + if (nonDefaultUser || !hasInitialSessions || hasQueue) { + LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} hasInitialSessions: {}", + queueName, nonDefaultUser, defaultQueuePool, hasInitialSessions); return getNewSessionState(conf, queueName, doOpen); } @@ -299,6 +354,16 @@ public class TezSessionPoolManager { } } + private void validateRestrictedConfigValues( + String var, String userValue, String serverValue) throws HiveException { + if ((userValue == null) != (serverValue == null) + || (userValue != null && !userValue.equals(serverValue))) { + String logValue = initConf.isHiddenConfig(var) ? "(hidden)" : serverValue; + throw new HiveException(var + " is restricted from being set; server is configured" + + " to use " + logValue + ", but the query configuration specifies " + userValue); + } + } + /** * @param conf HiveConf that is used to initialize the session * @param queueName could be null. Set in the tez session. @@ -407,11 +472,6 @@ public class TezSessionPoolManager { return new TezSessionPoolSession(sessionId, this); } - public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, - boolean llap) throws Exception { - return getSession(session, conf, doOpen, false, llap); - } - /* * This method helps to re-use a session in case there has been no change in * the configuration of a session. This will happen only in the case of non-hive-server2 @@ -458,8 +518,8 @@ public class TezSessionPoolManager { } } - public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, - boolean forceCreate, boolean llap) throws Exception { + public TezSessionState getSession( + TezSessionState session, HiveConf conf, boolean doOpen, boolean llap) throws Exception { if (llap && (this.numConcurrentLlapQueries > 0)) { llapQueue.acquire(); // blocks if no more llap queries can be submitted. } @@ -472,7 +532,7 @@ public class TezSessionPoolManager { closeIfNotDefault(session, false); } - return getSession(conf, doOpen, forceCreate); + return getSession(conf, doOpen); } /** Reopens the session that was found to not be running. */