Repository: hive
Updated Branches:
  refs/heads/master e3b10c1fc -> f9d51073a


HIVE-15732 : add the ability to restrict configuration for the queries 
submitted to HS2 (Tez pool) (Sergey Shelukhin, reviewed by Siddharth Seth, 
Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f9d51073
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f9d51073
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f9d51073

Branch: refs/heads/master
Commit: f9d51073ac02248b46c6a4800bc1ad6c7394af04
Parents: e3b10c1
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Mon Jan 30 16:10:40 2017 -0800
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Mon Jan 30 16:10:40 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 38 +++++++-
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 94 ++++++++++++++++----
 2 files changed, 113 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f9d51073/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 586e693..d19d2ea 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2364,7 +2364,14 @@ public class HiveConf extends Configuration {
     
HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS("hive.server2.tez.sessions.init.threads",
 16,
         "If hive.server2.tez.initialize.default.sessions is enabled, the 
maximum number of\n" +
         "threads to use to initialize the default sessions."),
-
+    
HIVE_SERVER2_TEZ_SESSION_RESTRICTED_CONFIGS("hive.server2.tez.sessions.restricted.configs",
 "",
+    "The configuration settings that cannot be set when submitting jobs to 
HiveServer2. If\n" +
+    "any of these are set to values different from those in the server 
configuration, an\n" +
+    "exception will be thrown."),
+    
HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED("hive.server2.tez.sessions.custom.queue.allowed",
+      "true", new StringSet("true", "false", "ignore"),
+      "Whether Tez session pool should allow submitting queries to custom 
queues. The options\n" +
+      "are true, false (error out), ignore (accept the query but ignore the 
queue setting)."),
 
     // Operation log configuration
     
HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled",
 true,
@@ -3331,7 +3338,7 @@ public class HiveConf extends Configuration {
             "See HIVE-15121 for details.");
 
     public final String varname;
-    private final String altName;
+    public final String altName;
     private final String defaultExpr;
 
     public final String defaultStrVal;
@@ -3855,6 +3862,11 @@ public class HiveConf extends Configuration {
       : conf.get(var.varname, var.defaultStrVal);
   }
 
+  public static String getVarWithoutType(Configuration conf, ConfVars var) {
+    return var.altName != null ? conf.get(var.varname, conf.get(var.altName, 
var.defaultExpr))
+      : conf.get(var.varname, var.defaultExpr);
+  }
+
   public static String getTrimmedVar(Configuration conf, ConfVars var) {
     assert (var.valClass == String.class) : var.varname;
     if (var.altName != null) {
@@ -4530,4 +4542,26 @@ public class HiveConf extends Configuration {
         + "Consider using a different execution engine (i.e. " + 
HiveConf.getNonMrEngines()
         + ") or using Hive 1.X releases.";
   }
+
+  private static final Object reverseMapLock = new Object();
+  private static HashMap<String, ConfVars> reverseMap = null;
+
+  public static HashMap<String, ConfVars> getOrCreateReverseMap() {
+    // This should be called rarely enough; for now it's ok to just lock every 
time.
+    synchronized (reverseMapLock) {
+      if (reverseMap != null) return reverseMap;
+    }
+    HashMap<String, ConfVars> vars = new HashMap<>();
+    for (ConfVars val : ConfVars.values()) {
+      vars.put(val.varname.toLowerCase(), val);
+      if (val.altName != null && !val.altName.isEmpty()) {
+        vars.put(val.altName.toLowerCase(), val);
+      }
+    }
+    synchronized (reverseMapLock) {
+      if (reverseMap != null) return reverseMap;
+      reverseMap = vars;
+      return reverseMap;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f9d51073/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index d7148bb..ecac85c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -67,6 +67,12 @@ import org.apache.tez.dag.api.TezException;
  */
 public class TezSessionPoolManager {
 
+  private enum CustomQueueAllowed {
+    TRUE,
+    FALSE,
+    IGNORE
+  }
+
   private static final Logger LOG = 
LoggerFactory.getLogger(TezSessionPoolManager.class);
   private static final Random rdm = new Random();
 
@@ -80,13 +86,16 @@ public class TezSessionPoolManager {
   private Thread expirationThread;
   private Thread restartThread;
 
-
   private Semaphore llapQueue;
   private HiveConf initConf = null;
   // Config settings.
   private int numConcurrentLlapQueries = -1;
   private long sessionLifetimeMs = 0;
   private long sessionLifetimeJitterMs = 0;
+  private CustomQueueAllowed customQueueAllowed = CustomQueueAllowed.TRUE;
+  private List<ConfVars> restrictedHiveConf = new ArrayList<>();
+  private List<String> restrictedNonHiveConf = new ArrayList<>();
+
   /** A queue for initial sessions that have not been started yet. */
   private Queue<TezSessionPoolSession> initialSessions =
       new ConcurrentLinkedQueue<TezSessionPoolSession>();
@@ -199,6 +208,31 @@ public class TezSessionPoolManager {
     llapQueue = new Semaphore(numConcurrentLlapQueries, true);
 
     this.initConf = conf;
+    String queueAllowedStr = HiveConf.getVar(initConf,
+        ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
+    try {
+      this.customQueueAllowed = 
CustomQueueAllowed.valueOf(queueAllowedStr.toUpperCase());
+    } catch (Exception ex) {
+      throw new RuntimeException("Invalid value '" + queueAllowedStr + "' for 
" +
+          ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname);
+    }
+    String[] restrictedConfigs = HiveConf.getTrimmedStringsVar(initConf,
+        ConfVars.HIVE_SERVER2_TEZ_SESSION_RESTRICTED_CONFIGS);
+    if (restrictedConfigs != null && restrictedConfigs.length > 0) {
+      HashMap<String, ConfVars> confVars = HiveConf.getOrCreateReverseMap();
+      for (String confName : restrictedConfigs) {
+        if (confName == null || confName.isEmpty()) continue;
+        confName = confName.toLowerCase();
+        ConfVars cv = confVars.get(confName);
+        if (cv != null) {
+          restrictedHiveConf.add(cv);
+        } else {
+          LOG.warn("A restricted config " + confName + " is not recognized as 
a Hive setting.");
+          restrictedNonHiveConf.add(confName);
+        }
+      }
+    }
+    
 
     sessionLifetimeMs = conf.getTimeVar(
         ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS);
@@ -269,10 +303,33 @@ public class TezSessionPoolManager {
     return sessionState;
   }
 
-  private TezSessionState getSession(HiveConf conf, boolean doOpen,
-      boolean forceCreate)
+  private TezSessionState getSession(HiveConf conf, boolean doOpen)
       throws Exception {
     String queueName = conf.get("tez.queue.name");
+    boolean hasQueue = (queueName != null) && !queueName.isEmpty();
+    if (hasQueue) {
+      switch (customQueueAllowed) {
+      case FALSE: throw new HiveException("Specifying tez.queue.name is not 
allowed");
+      case IGNORE: {
+        LOG.warn("User has specified " + queueName + " queue; ignoring the 
setting");
+        queueName = null;
+        hasQueue = false;
+        conf.set("tez.queue.name", null);
+      }
+      default: // All good.
+      }
+    }
+    for (ConfVars var : restrictedHiveConf) {
+      String userValue = HiveConf.getVarWithoutType(conf, var),
+          serverValue = HiveConf.getVarWithoutType(initConf, var);
+      // Note: with some trickery, we could add logic for each type in 
ConfVars; for now the
+      // potential spurious mismatches (e.g. 0 and 0.0 for float) should be 
easy to work around.
+      validateRestrictedConfigValues(var.varname, userValue, serverValue);
+    }
+    for (String var : restrictedNonHiveConf) {
+      String userValue = conf.get(var), serverValue = initConf.get(var);
+      validateRestrictedConfigValues(var, userValue, serverValue);
+    }
 
     // TODO Session re-use completely disabled for doAs=true. Always launches 
a new session.
     boolean nonDefaultUser = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -283,11 +340,9 @@ public class TezSessionPoolManager {
      * their own credentials. We expect that with the new security model, 
things will
      * run as user hive in most cases.
      */
-    if (forceCreate || nonDefaultUser || !hasInitialSessions
-        || ((queueName != null) && !queueName.isEmpty())) {
-      LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} 
hasInitialSessions: {}" +
-              " forceCreate: {}", queueName, nonDefaultUser, defaultQueuePool, 
hasInitialSessions,
-          forceCreate);
+    if (nonDefaultUser || !hasInitialSessions || hasQueue) {
+      LOG.info("QueueName: {} nonDefaultUser: {} defaultQueuePool: {} 
hasInitialSessions: {}",
+              queueName, nonDefaultUser, defaultQueuePool, hasInitialSessions);
       return getNewSessionState(conf, queueName, doOpen);
     }
 
@@ -299,6 +354,16 @@ public class TezSessionPoolManager {
     }
   }
 
+  private void validateRestrictedConfigValues(
+      String var, String userValue, String serverValue) throws HiveException {
+    if ((userValue == null) != (serverValue == null)
+        || (userValue != null && !userValue.equals(serverValue))) {
+      String logValue = initConf.isHiddenConfig(var) ? "(hidden)" : 
serverValue;
+      throw new HiveException(var + " is restricted from being set; server is 
configured"
+          + " to use " + logValue + ", but the query configuration specifies " 
+ userValue);
+    }
+  }
+
   /**
    * @param conf HiveConf that is used to initialize the session
    * @param queueName could be null. Set in the tez session.
@@ -407,11 +472,6 @@ public class TezSessionPoolManager {
     return new TezSessionPoolSession(sessionId, this);
   }
 
-  public TezSessionState getSession(TezSessionState session, HiveConf conf, 
boolean doOpen,
-      boolean llap) throws Exception {
-    return getSession(session, conf, doOpen, false, llap);
-  }
-
   /*
    * This method helps to re-use a session in case there has been no change in
    * the configuration of a session. This will happen only in the case of 
non-hive-server2
@@ -458,8 +518,8 @@ public class TezSessionPoolManager {
     }
   }
 
-  public TezSessionState getSession(TezSessionState session, HiveConf conf, 
boolean doOpen,
-      boolean forceCreate, boolean llap) throws Exception {
+  public TezSessionState getSession(
+      TezSessionState session, HiveConf conf, boolean doOpen, boolean llap) 
throws Exception {
     if (llap && (this.numConcurrentLlapQueries > 0)) {
       llapQueue.acquire(); // blocks if no more llap queries can be submitted.
     }
@@ -472,7 +532,7 @@ public class TezSessionPoolManager {
       closeIfNotDefault(session, false);
     }
 
-    return getSession(conf, doOpen, forceCreate);
+    return getSession(conf, doOpen);
   }
 
   /** Reopens the session that was found to not be running. */

Reply via email to