Repository: hive Updated Branches: refs/heads/llap c5dc87a8e -> cc4075b53
HIVE-10647: Hive on LLAP: Limit HS2 from overwhelming LLAP (Vikram Dixit K, reviewed by Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc4075b5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc4075b5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc4075b5 Branch: refs/heads/llap Commit: cc4075b53006e4cfe4eae6c8d5f25b58f49fa3cc Parents: c5dc87a Author: vikram <[email protected]> Authored: Thu Jun 25 17:08:32 2015 -0700 Committer: vikram <[email protected]> Committed: Thu Jun 25 17:08:32 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../hive/ql/exec/tez/TezSessionPoolManager.java | 25 ++++++-- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +- .../hive/ql/optimizer/physical/LlapDecider.java | 8 ++- .../org/apache/hadoop/hive/ql/plan/TezWork.java | 17 +++-- .../hive/ql/exec/tez/TestTezSessionPool.java | 65 +++++++++++++++----- 6 files changed, 96 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 32944bd..2c20d51 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2241,7 +2241,9 @@ public class HiveConf extends Configuration { new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " + "directories that are partition-like but contain unsupported characters. 'throw' (an " + "exception) is the default; 'skip' will skip the invalid directories and still repair the" + - " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"); + " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"), + HIVE_SERVER2_LLAP_CONCURRENT_QUERIES("hive.server2.llap.concurrent.queries", -1, + "The number of queries allowed in parallel via llap. Negative number implies 'infinite'."); public final String varname; http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index dfa539f..b1e9235 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -45,8 +46,10 @@ public class TezSessionPoolManager { private static final Log LOG = LogFactory.getLog(TezSessionPoolManager.class); private BlockingQueue<TezSessionState> defaultQueuePool; + private Semaphore llapQueue; private int blockingQueueLength = -1; private HiveConf initConf = null; + int numConcurrentLlapQueries = -1; private boolean inited = false; @@ -83,11 +86,15 @@ public class TezSessionPoolManager { String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); int numSessions = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + numConcurrentLlapQueries = + conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); // the list of queues is a comma separated list. String defaultQueueList[] = defaultQueues.split(","); defaultQueuePool = new ArrayBlockingQueue<TezSessionState>(numSessions * defaultQueueList.length); + llapQueue = new Semaphore(numConcurrentLlapQueries, true); + this.initConf = conf; /* * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues) @@ -164,8 +171,11 @@ public class TezSessionPoolManager { return retTezSessionState; } - public void returnSession(TezSessionState tezSessionState) + public void returnSession(TezSessionState tezSessionState, boolean llap) throws Exception { + if (llap && (this.numConcurrentLlapQueries > 0)) { + llapQueue.release(); + } if (tezSessionState.isDefault()) { LOG.info("The session " + tezSessionState.getSessionId() + " belongs to the pool. Put it back in"); @@ -207,9 +217,9 @@ public class TezSessionPoolManager { return new TezSessionState(sessionId); } - public TezSessionState getSession( - TezSessionState session, HiveConf conf, boolean doOpen) throws Exception { - return getSession(session, conf, doOpen, false); + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean llap) throws Exception { + return getSession(session, conf, doOpen, false, llap); } /* @@ -268,8 +278,11 @@ public class TezSessionPoolManager { return true; } - public TezSessionState getSession(TezSessionState session, HiveConf conf, - boolean doOpen, boolean forceCreate) throws Exception { + public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen, + boolean forceCreate, boolean llap) throws Exception { + if (llap && (this.numConcurrentLlapQueries > 0)) { + llapQueue.acquire(); // blocks if no more llap queries can be submitted. + } if (canWorkWithSameSession(session, conf)) { return session; } http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b2558d1..7f50bea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -114,7 +114,9 @@ public class TezTask extends Task<TezWork> { // Need to remove this static hack. But this is the way currently to get a session. SessionState ss = SessionState.get(); session = ss.getTezSession(); - session = TezSessionPoolManager.getInstance().getSession(session, conf, false); + session = + TezSessionPoolManager.getInstance().getSession(session, conf, false, + getWork().getLlapMode()); ss.setTezSession(session); // jobConf will hold all the configuration for hadoop, tez, and hive @@ -173,7 +175,7 @@ public class TezTask extends Task<TezWork> { // fetch the counters Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); - TezSessionPoolManager.getInstance().returnSession(session); + TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); if (LOG.isInfoEnabled() && counters != null && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 0a22f20..d49d83e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -102,8 +102,8 @@ public class LlapDecider implements PhysicalPlanResolver { class LlapDecisionDispatcher implements Dispatcher { - private PhysicalContext pctx; - private HiveConf conf; + private final PhysicalContext pctx; + private final HiveConf conf; public LlapDecisionDispatcher(PhysicalContext pctx) { this.pctx = pctx; @@ -291,6 +291,7 @@ public class LlapDecider implements PhysicalPlanResolver { Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack<Node> s, NodeProcessorCtx c, Object... os) { return new Boolean(false); @@ -299,6 +300,7 @@ public class LlapDecider implements PhysicalPlanResolver { opRules.put(new RuleRegExp("No user code in fil", FilterOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack<Node> s, NodeProcessorCtx c, Object... os) { ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); @@ -308,6 +310,7 @@ public class LlapDecider implements PhysicalPlanResolver { opRules.put(new RuleRegExp("No user code in gby", GroupByOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack<Node> s, NodeProcessorCtx c, Object... os) { List<AggregationDesc> aggs = ((GroupByOperator)n).getConf().getAggregators(); @@ -317,6 +320,7 @@ public class LlapDecider implements PhysicalPlanResolver { opRules.put(new RuleRegExp("No user code in select", SelectOperator.getOperatorName() + "%"), new NodeProcessor() { + @Override public Object process(Node n, Stack<Node> s, NodeProcessorCtx c, Object... os) { List<ExprNodeDesc> exprs = ((SelectOperator)n).getConf().getColList(); http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 7b91002..17c5ad7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -263,7 +263,7 @@ public class TezWork extends AbstractOperatorDesc { /* * Dependency is a class used for explain - */ + */ public class Dependency implements Serializable, Comparable<Dependency> { public BaseWork w; public EdgeType type; @@ -272,7 +272,7 @@ public class TezWork extends AbstractOperatorDesc { public String getName() { return w.getName(); } - + @Explain(displayName = "Type") public String getType() { return type.toString(); @@ -306,7 +306,7 @@ public class TezWork extends AbstractOperatorDesc { } return result; } - + private static final String MR_JAR_PROPERTY = "tmpjars"; /** * Calls configureJobConf on instances of work that are part of this TezWork. @@ -349,7 +349,7 @@ public class TezWork extends AbstractOperatorDesc { /** * connect adds an edge between a and b. Both nodes have * to be added prior to calling connect. - * @param + * @param */ public void connect(BaseWork a, BaseWork b, TezEdgeProperty edgeProp) { @@ -396,4 +396,13 @@ public class TezWork extends AbstractOperatorDesc { public VertexType getVertexType(BaseWork w) { return workVertexTypeMap.get(w); } + + public boolean getLlapMode() { + for (BaseWork work : getAllWork()) { + if (work.getLlapMode()) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 37a84aa..c148aae 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -55,13 +55,13 @@ public class TestTezSessionPool { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true); + TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -81,30 +81,30 @@ public class TestTezSessionPool { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(); - TezSessionState sessionState = poolManager.getSession(null, conf, true); + TezSessionState sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("b") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("c") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); - sessionState = poolManager.getSession(null, conf, true); + sessionState = poolManager.getSession(null, conf, true, false); if (sessionState.getQueueName().compareTo("a") != 0) { fail(); } - poolManager.returnSession(sessionState); + poolManager.returnSession(sessionState, false); } catch (Exception e) { e.printStackTrace(); @@ -112,8 +112,44 @@ public class TestTezSessionPool { } } + @Test + public void testLlapSessionQueuing() { + try { + random = new Random(1000); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2); + poolManager = new TestTezSessionPoolManager(); + poolManager.setupPool(conf); + poolManager.startPool(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + + List<Thread> threadList = new ArrayList<Thread>(); + for (int i = 0; i < 15; i++) { + Thread t = new Thread(new SessionThread(true)); + threadList.add(t); + t.start(); + } + + for (Thread t : threadList) { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + } + public class SessionThread implements Runnable { + private boolean llap = false; + + public SessionThread(boolean llap) { + this.llap = llap; + } + @Override public void run() { try { @@ -124,9 +160,9 @@ public class TestTezSessionPool { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true); + TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); - poolManager.returnSession(session); + poolManager.returnSession(session, llap); } catch (Exception e) { e.printStackTrace(); } @@ -150,7 +186,8 @@ public class TestTezSessionPool { } List<Thread> threadList = new ArrayList<Thread>(); for (int i = 0; i < 15; i++) { - Thread t = new Thread(new SessionThread()); + Thread t = new Thread(new SessionThread(false)); + threadList.add(t); t.start(); }
