Repository: hive Updated Branches: refs/heads/master 4a30574d3 -> 250e10ecf
HIVE-19924: Tag distcp jobs run by Repl Load (Mahesh Kumar Behera, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/250e10ec Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/250e10ec Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/250e10ec Branch: refs/heads/master Commit: 250e10ecf00e4b1e2536ca943be7e9068d699a6c Parents: 4a30574 Author: Sankar Hariappan <[email protected]> Authored: Mon Aug 13 11:16:32 2018 +0530 Committer: Sankar Hariappan <[email protected]> Committed: Mon Aug 13 11:16:32 2018 +0530 ---------------------------------------------------------------------- .../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 3 +- .../org/apache/hive/jdbc/TestJdbcDriver2.java | 28 ++++ .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 66 ---------- .../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 128 ++++++++++++++++++- .../org/apache/hadoop/hive/ql/QueryState.java | 27 ++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +- .../org/apache/hadoop/hive/ql/exec/Task.java | 5 + .../ql/exec/tez/KillTriggerActionHandler.java | 3 +- .../hive/ql/exec/tez/WorkloadManager.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 58 +++++---- .../hadoop/hive/ql/session/KillQuery.java | 3 +- .../hadoop/hive/ql/session/NullKillQuery.java | 3 +- .../org/apache/hive/service/cli/CLIService.java | 2 +- .../hive/service/cli/operation/Operation.java | 8 ++ .../service/cli/operation/OperationManager.java | 35 ++++- .../service/cli/operation/SQLOperation.java | 4 +- .../hive/service/cli/session/HiveSession.java | 2 + .../service/cli/session/HiveSessionImpl.java | 5 + .../hive/service/server/KillQueryImpl.java | 88 ++++++++++++- 19 files changed, 361 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 98f4729..d2e9514 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -110,7 +110,7 @@ public abstract class BaseJdbcWithMiniLlap { private static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer - public static void beforeTest(HiveConf inputConf) throws Exception { + public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception { conf = inputConf; Class.forName(MiniHS2.getJdbcDriverName()); miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); @@ -120,6 +120,7 @@ public abstract class BaseJdbcWithMiniLlap { Map<String, String> confOverlay = new HashMap<String, String>(); miniHS2.start(confOverlay); miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + return miniHS2; } static HiveConf defaultConf() throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 8f552b0..c2f5703 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -3036,6 +3036,34 @@ public class TestJdbcDriver2 { } } + @Test + public void testGetQueryId() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + HiveStatement stmt1 = (HiveStatement) con.createStatement(); + stmt.executeAsync("create database query_id_test with dbproperties ('repl.source.for' = '1, 2, 3')"); + String queryId = stmt.getQueryId(); + assertFalse(queryId.isEmpty()); + stmt.getUpdateCount(); + + stmt1.executeAsync("repl status query_id_test with ('hive.query.id' = 'hiveCustomTag')"); + String queryId1 = stmt1.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId1)); + assertFalse(queryId.equals(queryId1)); + assertFalse(queryId1.isEmpty()); + stmt1.getUpdateCount(); + + stmt.executeAsync("select count(*) from " + dataTypeTableName); + queryId = stmt.getQueryId(); + assertFalse("hiveCustomTag".equals(queryId)); + assertFalse(queryId.isEmpty()); + assertFalse(queryId.equals(queryId1)); + stmt.getUpdateCount(); + + stmt.execute("drop database query_id_test"); + stmt.close(); + stmt1.close(); + } + // Test that opening a JDBC connection to a non-existent database throws a HiveSQLException @Test(expected = HiveSQLException.class) public void testConnectInvalidDatabase() throws SQLException { http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 2139709..5cb0a88 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -21,7 +21,6 @@ package org.apache.hive.jdbc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1477,71 +1476,6 @@ public class TestJdbcWithMiniHS2 { } } - /** - * Test CLI kill command of a query that is running. - * We spawn 2 threads - one running the query and - * the other attempting to cancel. - * We're using a dummy udf to simulate a query, - * that runs for a sufficiently long time. - * @throws Exception - */ - @Test - public void testKillQuery() throws Exception { - Connection con = conTestDb; - Connection con2 = getConnection(testDbName); - - String udfName = SleepMsUDF.class.getName(); - Statement stmt1 = con.createStatement(); - final Statement stmt2 = con2.createStatement(); - stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); - stmt1.close(); - final Statement stmt = con.createStatement(); - final ExceptionHolder tExecuteHolder = new ExceptionHolder(); - final ExceptionHolder tKillHolder = new ExceptionHolder(); - - // Thread executing the query - Thread tExecute = new Thread(new Runnable() { - @Override - public void run() { - try { - System.out.println("Executing query: "); - // The test table has 500 rows, so total query time should be ~ 500*500ms - stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + - "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); - fail("Expecting SQLException"); - } catch (SQLException e) { - tExecuteHolder.throwable = e; - } - } - }); - // Thread killing the query - Thread tKill = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(2000); - String queryId = ((HiveStatement) stmt).getQueryId(); - System.out.println("Killing query: " + queryId); - - stmt2.execute("kill query '" + queryId + "'"); - stmt2.close(); - } catch (Exception e) { - tKillHolder.throwable = e; - } - } - }); - - tExecute.start(); - tKill.start(); - tExecute.join(); - tKill.join(); - stmt.close(); - con2.close(); - - assertNotNull("tExecute", tExecuteHolder.throwable); - assertNull("tCancel", tKillHolder.throwable); - } - private static class ExceptionHolder { Throwable throwable; } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index c02980b..4942ed9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -30,21 +30,55 @@ import org.apache.hadoop.io.NullWritable; import org.junit.BeforeClass; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - +import org.junit.AfterClass; +import org.junit.Test; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Connection; +import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; /** * TestJdbcWithMiniLlap for Arrow format */ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { + private static MiniHS2 miniHS2 = null; + private static final String tableName = "testJdbcMinihs2Tbl"; + private static String dataFileDir; + private static final String testDbName = "testJdbcMinihs2"; + private static class ExceptionHolder { + Throwable throwable; + } @BeforeClass public static void beforeTest() throws Exception { HiveConf conf = defaultConf(); conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); - BaseJdbcWithMiniLlap.beforeTest(conf); + MiniHS2.cleanupLocalDir(); + miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + + Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), + System.getProperty("user.name"), "bar"); + Statement stmt = conDefault.createStatement(); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + stmt.close(); + conDefault.close(); + } + + @AfterClass + public static void afterTest() { + if (miniHS2 != null && miniHS2.isStarted()) { + miniHS2.stop(); + } } @Override @@ -230,5 +264,95 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } + /** + * SleepMsUDF + */ + public static class SleepMsUDF extends UDF { + public Integer evaluate(int value, int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + + /** + * Test CLI kill command of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testKillQuery() throws Exception { + Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), + System.getProperty("user.name"), "bar"); + Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), + System.getProperty("user.name"), "bar"); + + String udfName = SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + final Statement stmt2 = con2.createStatement(); + Path dataFilePath = new Path(dataFileDir, "kv1.txt"); + + String tblName = testDbName + "." + tableName; + + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.execute("create table " + tblName + " (int_col int, value string) "); + stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName); + + + stmt1.close(); + final Statement stmt = con.createStatement(); + final ExceptionHolder tExecuteHolder = new ExceptionHolder(); + final ExceptionHolder tKillHolder = new ExceptionHolder(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + stmt.execute("set hive.llap.execution.mode = none"); + + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + + "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + // Thread killing the query + Thread tKill = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(5000); + String queryId = ((HiveStatement) stmt).getQueryId(); + System.out.println("Killing query: " + queryId); + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + } catch (Exception e) { + tKillHolder.throwable = e; + } + } + }); + + tExecute.start(); + tKill.start(); + tExecute.join(); + tKill.join(); + stmt.close(); + con2.close(); + con.close(); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index b1a602c..028dd60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.LineageState; +import org.apache.hadoop.mapreduce.MRJobConfig; /** * The class to store query level info such as queryId. Multiple queries can run @@ -54,6 +55,10 @@ public class QueryState { */ private long numModifiedRows = 0; + // Holds the tag supplied by user to uniquely identify the query. Can be used to kill the query if the query + // id cannot be queried for some reason like hive server restart. + private String queryTag = null; + /** * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object @@ -62,6 +67,7 @@ public class QueryState { this.queryConf = conf; } + // Get the query id stored in query specific config. public String getQueryId() { return (queryConf.getVar(HiveConf.ConfVars.HIVEQUERYID)); } @@ -112,6 +118,25 @@ public class QueryState { public void setNumModifiedRows(long numModifiedRows) { this.numModifiedRows = numModifiedRows; } + + public String getQueryTag() { + return queryTag; + } + + public void setQueryTag(String queryTag) { + this.queryTag = queryTag; + } + + public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) { + String jobTag = queryConf.get(MRJobConfig.JOB_TAGS); + if (jobTag == null) { + jobTag = queryTag; + } else { + jobTag = jobTag.concat("," + queryTag); + } + queryConf.set(MRJobConfig.JOB_TAGS, jobTag); + } + /** * Builder to instantiate the QueryState object. */ @@ -221,6 +246,8 @@ public class QueryState { if (generateNewQueryId) { String queryId = QueryPlan.makeQueryId(); queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId); + setMapReduceJobTag(queryConf, queryId); + // FIXME: druid storage handler relies on query.id to maintain some staging directories // expose queryid to session level if (hiveConf != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index accd7f1..467f728 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3296,7 +3296,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { private int killQuery(Hive db, KillQueryDesc desc) throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY"); + sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY", db.getConf()); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 240208a..11ef62c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,6 +203,10 @@ public abstract class Task<T extends Serializable> implements Serializable, Node if (hiveHistory != null) { hiveHistory.logPlanProgress(queryPlan); } + + if (conf != null) { + LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS)); + } int retval = execute(driverContext); this.setDone(); if (hiveHistory != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 50d234d..f357775 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -42,7 +42,8 @@ public class KillTriggerActionHandler implements TriggerActionHandler<TezSession KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), + sessionState.getConf()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 7137a17..5326e35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -438,7 +438,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida WmEvent wmEvent = new WmEvent(WmEvent.EventType.KILL); LOG.info("Invoking KillQuery for " + queryId + ": " + reason); try { - kq.killQuery(queryId, reason); + kq.killQuery(queryId, reason, toKill.getConf()); addKillQueryResult(toKill, true); killCtx.killSessionFuture.set(true); wmEvent.endEvent(toKill); http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index adaa3d3..e4186c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -228,20 +230,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map<String, String> replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry<String, String> config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL LOAD statement"); @@ -360,6 +349,32 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } + private void setConfigs(ASTNode node) throws SemanticException { + Map<String, String> replConfigs = DDLSemanticAnalyzer.getProps(node); + if (null != replConfigs) { + for (Map.Entry<String, String> config : replConfigs.entrySet()) { + String key = config.getKey(); + // don't set the query id in the config + if (key.equalsIgnoreCase(HIVEQUERYID.varname)) { + String queryTag = config.getValue(); + if (!StringUtils.isEmpty(queryTag)) { + QueryState.setMapReduceJobTag(conf, queryTag); + } + queryState.setQueryTag(queryTag); + } else { + conf.set(key, config.getValue()); + } + } + + // As hive conf is changed, need to get the Hive DB again with it. + try { + db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } + } + } + // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -371,20 +386,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); break; case TOK_REPL_CONFIG: - Map<String, String> replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) childNode.getChild(0)); - if (null != replConfigs) { - for (Map.Entry<String, String> config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); - } - - // As hive conf is changed, need to get the Hive DB again with it. - try { - db = Hive.get(conf); - } catch (HiveException e) { - throw new SemanticException(e); - } - } + setConfigs((ASTNode) childNode.getChild(0)); break; default: throw new SemanticException("Unrecognized token in REPL STATUS statement"); http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 2e183dc..01dc7e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hive.ql.session; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId, String errMsg) throws HiveException; + void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException; } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index b62f22c..eac2936 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -18,11 +18,12 @@ package org.apache.hadoop.hive.ql.session; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId, String errMsg) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { // Do nothing } } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/CLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index e28e513..9cbe7e1 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -624,7 +624,7 @@ public class CLIService extends CompositeService implements ICLIService { public String getQueryId(TOperationHandle opHandle) throws HiveSQLException { Operation operation = sessionManager.getOperationManager().getOperation( new OperationHandle(opHandle)); - final String queryId = operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID); + final String queryId = operation.getQueryId(); LOG.debug(opHandle + ": getQueryId() " + queryId); return queryId; } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/Operation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 1ee0756..4b9cbd3 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -413,4 +413,12 @@ public abstract class Operation { protected void markOperationCompletedTime() { operationComplete = System.currentTimeMillis(); } + + public String getQueryTag() { + return queryState.getQueryTag(); + } + + public String getQueryId() { + return queryState.getQueryId(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 5336034..8db6a29 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -62,6 +62,7 @@ public class OperationManager extends AbstractService { new ConcurrentHashMap<OperationHandle, Operation>(); private final ConcurrentHashMap<String, Operation> queryIdOperation = new ConcurrentHashMap<String, Operation>(); + private final ConcurrentHashMap<String, String> queryTagToIdMap = new ConcurrentHashMap<>(); //Following fields for displaying queries on WebUI private Object webuiLock = new Object(); @@ -201,11 +202,32 @@ public class OperationManager extends AbstractService { } } + public void updateQueryTag(String queryId, String queryTag) { + Operation operation = queryIdOperation.get(queryId); + if (operation != null) { + String queryIdTemp = queryTagToIdMap.get(queryTag); + if (queryIdTemp != null) { + throw new RuntimeException("tag " + queryTag + " is already applied for query " + queryIdTemp); + } + queryTagToIdMap.put(queryTag, queryId); + LOG.info("Query " + queryId + " is updated with tag " + queryTag); + return; + } + LOG.info("Query id is missing during query tag updation"); + } + private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); + if (operation == null) { + throw new RuntimeException("Operation does not exist: " + opHandle); + } String queryId = getQueryId(operation); queryIdOperation.remove(queryId); - LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle); + String queryTag = operation.getQueryTag(); + if (queryTag != null) { + queryTagToIdMap.remove(queryTag); + } + LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId, opHandle, queryTag); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } @@ -285,9 +307,6 @@ public class OperationManager extends AbstractService { public void closeOperation(OperationHandle opHandle) throws HiveSQLException { LOG.info("Closing operation: " + opHandle); Operation operation = removeOperation(opHandle); - if (operation == null) { - throw new HiveSQLException("Operation does not exist: " + opHandle); - } Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { @@ -422,4 +441,12 @@ public class OperationManager extends AbstractService { public Operation getOperationByQueryId(String queryId) { return queryIdOperation.get(queryId); } + + public Operation getOperationByQueryTag(String queryTag) { + String queryId = queryTagToIdMap.get(queryTag); + if (queryId != null) { + return getOperationByQueryId(queryId); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9a07fa1..36df57e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -198,7 +198,9 @@ public class SQLOperation extends ExecuteStatementOperation { if (0 != response.getResponseCode()) { throw toSQLException("Error while compiling statement", response); } - + if (queryState.getQueryTag() != null && queryState.getQueryId() != null) { + parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag()); + } setHasResultSet(driver.hasResultSet()); } catch (HiveSQLException e) { setState(OperationState.ERROR); http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSession.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index b4070ce..cce9c22 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -200,6 +200,8 @@ public interface HiveSession extends HiveSessionBase { void cancelOperation(OperationHandle opHandle) throws HiveSQLException; + void updateQueryTag(String queryId, String queryTag) throws HiveSQLException; + void closeOperation(OperationHandle opHandle) throws HiveSQLException; TableSchema getResultSetMetadata(OperationHandle opHandle) http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b9a8537..e5cdc7b 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -874,6 +874,11 @@ public class HiveSessionImpl implements HiveSession { } @Override + public void updateQueryTag(String queryId, String queryTag) throws HiveSQLException { + sessionManager.getOperationManager().updateQueryTag(queryId, queryTag); + } + + @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); try { http://git-wip-us.apache.org/repos/asf/hive/blob/250e10ec/service/src/java/org/apache/hive/service/server/KillQueryImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index b39a7b1..490a04d 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -18,8 +18,20 @@ package org.apache.hive.service.server; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.operation.Operation; @@ -27,6 +39,12 @@ import org.apache.hive.service.cli.operation.OperationManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class KillQueryImpl implements KillQuery { private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class); @@ -36,18 +54,82 @@ public class KillQueryImpl implements KillQuery { this.operationManager = operationManager; } + public static Set<ApplicationId> getChildYarnJobs(Configuration conf, String tag) throws IOException, YarnException { + Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.OWN); + gar.setApplicationTags(Collections.singleton(tag)); + + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List<ApplicationReport> appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + + if (childYarnJobs.isEmpty()) { + LOG.info("No child applications found"); + } else { + LOG.info("Found child YARN applications: " + StringUtils.join(childYarnJobs, ",")); + } + + return childYarnJobs; + } + + public static void killChildYarnJobs(Configuration conf, String tag) { + try { + if (tag == null) { + return; + } + Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag); + if (!childYarnJobs.isEmpty()) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + for (ApplicationId app : childYarnJobs) { + yarnClient.killApplication(app); + } + } + } catch (IOException | YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } + } + @Override - public void killQuery(String queryId, String errMsg) throws HiveException { + public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException { try { + String queryTag = null; + Operation operation = operationManager.getOperationByQueryId(queryId); if (operation == null) { - LOG.info("Query not found: " + queryId); + // Check if user has passed the query tag to kill the operation. This is possible if the application + // restarts and it does not have the proper query id. The tag can be used in that case to kill the query. + operation = operationManager.getOperationByQueryTag(queryId); + if (operation == null) { + LOG.info("Query not found: " + queryId); + } } else { + // This is the normal flow, where the query is tagged and user wants to kill the query using the query id. + queryTag = operation.getQueryTag(); + } + + if (queryTag == null) { + //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are + //tagged with query id by default. This will cover the case where the application after restarts wants to kill + //the yarn jobs with query tag. The query tag can be passed as query id. + queryTag = queryId; + } + + LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); + killChildYarnJobs(conf, queryTag); + + if (operation != null) { OperationHandle handle = operation.getHandle(); operationManager.cancelOperation(handle, errMsg); } } catch (HiveSQLException e) { - throw new HiveException(e); + LOG.error("Kill query failed for query " + queryId, e); + throw new HiveException(e.getMessage(), e); } } }
