Author: vikram Date: Tue May 27 18:27:51 2014 New Revision: 1597845 URL: http://svn.apache.org/r1597845 Log: HIVE-7043: When using the tez session pool via hive, once sessions time out, all queries go to the default queue: (Vikram Dixit, reviewed by Gunther Hagleitner)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1597845&r1=1597844&r2=1597845&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Tue May 27 18:27:51 2014 @@ -78,7 +78,7 @@ public class TezJobMonitor { try { for (TezSessionState s: TezSessionState.getOpenSessions()) { System.err.println("Shutting down tez session."); - s.close(false); + TezSessionPoolManager.getInstance().close(s); } } catch (Exception e) { // ignore Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1597845&r1=1597844&r2=1597845&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Tue May 27 18:27:51 2014 @@ -262,9 +262,19 @@ public class TezSessionPoolManager { } if (session != null) { - session.close(false); + close(session); } return getSession(conf, doOpen, forceCreate); } + + public void closeAndOpen(TezSessionState sessionState, HiveConf conf) + throws Exception { + HiveConf sessionConf = sessionState.getConf(); + if (sessionConf.get("tez.queue.name") != null) { + conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); + } + close(sessionState); + sessionState.open(conf); + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1597845&r1=1597844&r2=1597845&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue May 27 18:27:51 2014 @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -28,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import javax.security.auth.login.LoginException; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; @@ -57,7 +52,6 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -297,8 +291,7 @@ public class TezTask extends Task<TezWor DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSessionState sessionState) - throws IOException, TezException, InterruptedException, - LoginException, URISyntaxException, HiveException { + throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; @@ -310,11 +303,7 @@ public class TezTask extends Task<TezWor console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - sessionState.close(true); - - // (re)open the session - sessionState.open(this.conf); - + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1597845&r1=1597844&r2=1597845&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Tue May 27 18:27:51 2014 @@ -29,14 +29,11 @@ import static org.mockito.Mockito.verify import static org.mockito.Mockito.when; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import javax.security.auth.login.LoginException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -61,7 +58,6 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.SessionNotRunning; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.junit.After; @@ -202,8 +198,7 @@ public class TestTezTask { } @Test - public void testSubmit() throws LoginException, IllegalArgumentException, - IOException, TezException, InterruptedException, URISyntaxException, HiveException { + public void testSubmit() throws Exception { DAG dag = new DAG("test"); task.submit(conf, dag, path, appLr, sessionState); // validate close/reopen