This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new b1503b5123f HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3
(#4789)
b1503b5123f is described below
commit b1503b5123fde96cf7a7583e41a70083c704b3cd
Author: Aman Raj <[email protected]>
AuthorDate: Mon Oct 16 13:31:43 2023 +0530
HIVE-27784: Backport of HIVE-20364, HIVE-20549 to branch-3 (#4789)
* HIVE-20364: Update default for hive.map.aggr.hash.min.reduction
* HIVE-20549: Allow user set query tag, and kill query with tag (Daniel
Dai, reviewed by Thejas Nair, Sergey Shelukhin)
* Removed explainanalyze_2.q test to fix in HIVE-27795
---------
Co-authored-by: Ashutosh Chauhan <[email protected]>
Co-authored-by: Mahesh Kumar Behera <[email protected]>
Co-authored-by: Daniel Dai <[email protected]>
Signed-off-by: Sankar Hariappan <[email protected]>
Closes (#4789)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 153 +++++++++++++++------
.../test/resources/testconfiguration.properties | 5 +-
.../java/org/apache/hive/jdbc/HiveStatement.java | 6 +-
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 7 +-
.../java/org/apache/hadoop/hive/ql/QueryState.java | 23 +++-
.../hive/ql/exec/tez/KillTriggerActionHandler.java | 5 +
.../hadoop/hive/ql/exec/tez/WorkloadManager.java | 3 +
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 2 +-
.../clientnegative/authorization_kill_query.q | 15 --
.../service/cli/operation/OperationManager.java | 29 ++--
.../apache/hive/service/server/KillQueryImpl.java | 112 +++++++++++----
12 files changed, 257 insertions(+), 110 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bf20a78b588..6bd226c442f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1519,6 +1519,10 @@ public class HiveConf extends Configuration {
HIVEQUERYID("hive.query.id", "",
"ID for query being executed (might be multiple per a session)"),
+ HIVEQUERYTAG("hive.query.tag", null, "Tag for the queries in the
session. User can kill the queries with the tag " +
+ "in another session. Currently there is no tag duplication check, user
need to make sure his tag is unique. " +
+ "Also 'kill query' needs to be issued to all HiveServer2 instances to
proper kill the queries"),
+
HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"),
// hive jar
@@ -1688,7 +1692,7 @@ public class HiveConf extends Configuration {
"How many rows with the same key value should be cached in memory per
smb joined table."),
HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000,
"Number of rows after which size of the grouping keys/aggregation
classes is performed"),
- HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5,
+ HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.99,
"Portion of total memory to be used by map-side group aggregation hash
table"),
HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory",
(float) 0.3,
"Portion of total memory to be used by map-side group aggregation hash
table, when this group by is followed by map join"),
@@ -5451,6 +5455,7 @@ public class HiveConf extends Configuration {
ConfVars.SHOW_JOB_FAIL_DEBUG_INFO.varname,
ConfVars.TASKLOG_DEBUG_TIMEOUT.varname,
ConfVars.HIVEQUERYID.varname,
+ ConfVars.HIVEQUERYTAG.varname,
};
/**
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 3dcc4928b1a..dcb8701e696 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -43,9 +43,12 @@ import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TestJdbcWithMiniLlap for Arrow format
@@ -57,6 +60,7 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
private static final String tableName = "testJdbcMinihs2Tbl";
private static String dataFileDir;
private static final String testDbName = "testJdbcMinihs2";
+ private static final String tag = "mytag";
private static class ExceptionHolder {
Throwable throwable;
@@ -66,6 +70,12 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
public static void beforeTest() throws Exception {
HiveConf conf = defaultConf();
conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ conf.setVar(ConfVars.HIVE_AUTHENTICATOR_MANAGER,
"org.apache.hadoop.hive.ql.security" +
+ ".SessionStateUserAuthenticator");
+ conf.setVar(ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name"));
+ conf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
+
conf.setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST_APPEND,
ConfVars.HIVE_SUPPORT_CONCURRENCY
+ .varname + "|" + ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname);
MiniHS2.cleanupLocalDir();
miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:",
"");
@@ -73,8 +83,19 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
Connection conDefault =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
Statement stmt = conDefault.createStatement();
+ String tblName = testDbName + "." + tableName;
+ Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+ String udfName = SleepMsUDF.class.getName();
stmt.execute("drop database if exists " + testDbName + " cascade");
stmt.execute("create database " + testDbName);
+ stmt.execute("set role admin");
+ stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt");
+ stmt.execute("use " + testDbName);
+ stmt.execute("create table " + tblName + " (int_col int, value string) ");
+ stmt.execute("load data inpath 'kv1.txt' into table " + tblName);
+ stmt.execute("create function sleepMsUDF as '" + udfName + "'");
+ stmt.execute("grant select on table " + tblName + " to role public");
+
stmt.close();
conDefault.close();
}
@@ -291,29 +312,16 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
* that runs for a sufficiently long time.
* @throws Exception
*/
- @Test
- public void testKillQuery() throws Exception {
- Connection con =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
- System.getProperty("user.name"), "bar");
+ private void testKillQueryInternal(String user, String killUser, boolean
useTag, final
+ ExceptionHolder stmtHolder, final ExceptionHolder tKillHolder) throws
Exception {
+ Connection con1 =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+ user, "bar");
Connection con2 =
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
- System.getProperty("user.name"), "bar");
+ killUser, "bar");
- String udfName = SleepMsUDF.class.getName();
- Statement stmt1 = con.createStatement();
final Statement stmt2 = con2.createStatement();
- Path dataFilePath = new Path(dataFileDir, "kv1.txt");
-
- String tblName = testDbName + "." + tableName;
-
- stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
- stmt1.execute("create table " + tblName + " (int_col int, value string) ");
- stmt1.execute("load data local inpath '" + dataFilePath.toString() + "'
into table " + tblName);
-
-
- stmt1.close();
- final Statement stmt = con.createStatement();
- final ExceptionHolder tExecuteHolder = new ExceptionHolder();
- final ExceptionHolder tKillHolder = new ExceptionHolder();
+ final HiveStatement stmt1 = (HiveStatement)con1.createStatement();
+ final StringBuffer stmtQueryId = new StringBuffer();
// Thread executing the query
Thread tExecute = new Thread(new Runnable() {
@@ -321,43 +329,104 @@ public class TestJdbcWithMiniLlapArrow extends
BaseJdbcWithMiniLlap {
public void run() {
try {
System.out.println("Executing query: ");
- stmt.execute("set hive.llap.execution.mode = none");
+ stmt1.execute("set hive.llap.execution.mode = none");
+ if (useTag) {
+ stmt1.execute("set hive.query.tag = " + tag);
+ }
// The test table has 500 rows, so total query time should be ~
500*500ms
- stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col,
t2.int_col " +
+ stmt1.executeAsync("select sleepMsUDF(t1.int_col, 100), t1.int_col,
t2.int_col " +
"from " + tableName + " t1 join " + tableName + " t2 on
t1.int_col = t2.int_col");
+ stmtQueryId.append(stmt1.getQueryId());
+ stmt1.getUpdateCount();
} catch (SQLException e) {
- tExecuteHolder.throwable = e;
+ stmtHolder.throwable = e;
}
}
});
- // Thread killing the query
- Thread tKill = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);
- String queryId = ((HiveStatement) stmt).getQueryId();
- System.out.println("Killing query: " + queryId);
- stmt2.execute("kill query '" + queryId + "'");
- stmt2.close();
- } catch (Exception e) {
- tKillHolder.throwable = e;
+
+ tExecute.start();
+
+ // wait for other thread to create the stmt handle
+ int count = 0;
+ while (count < 15) {
+ try {
+ tKillHolder.throwable = null;
+ Thread.sleep(2000);
+ String queryId;
+ if (useTag) {
+ queryId = tag;
+ } else {
+ if (stmtQueryId.length() != 0) {
+ queryId = stmtQueryId.toString();
+ } else {
+ count++;
+ continue;
+ }
}
+ System.out.println("Killing query: " + queryId);
+ if (killUser.equals(System.getProperty("user.name"))) {
+ stmt2.execute("set role admin");
+ }
+ stmt2.execute("kill query '" + queryId + "'");
+ stmt2.close();
+ break;
+ } catch (SQLException e) {
+ count++;
+ tKillHolder.throwable = e;
}
- });
+ }
- tExecute.start();
- tKill.start();
tExecute.join();
- tKill.join();
- stmt.close();
- con2.close();
- con.close();
+ try {
+ stmt1.close();
+ con1.close();
+ con2.close();
+ } catch (Exception e) {
+ // ignore error
+ }
+ }
+ @Test
+ @Override
+ public void testKillQuery() throws Exception {
+ testKillQueryById();
+ testKillQueryByTagNegative();
+ testKillQueryByTagAdmin();
+ testKillQueryByTagOwner();
+ }
+
+ public void testKillQueryById() throws Exception {
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+ testKillQueryInternal(System.getProperty("user.name"),
System.getProperty("user.name"), false,
+ tExecuteHolder, tKillHolder);
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ public void testKillQueryByTagNegative() throws Exception {
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+ testKillQueryInternal("user1", "user2", true, tExecuteHolder, tKillHolder);
+ assertNotNull("tCancel", tKillHolder.throwable);
+ assertTrue(tKillHolder.throwable.getMessage(),
tKillHolder.throwable.getMessage().contains("No privilege"));
+ }
+
+ public void testKillQueryByTagAdmin() throws Exception {
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+ testKillQueryInternal("user1", System.getProperty("user.name"), true,
tExecuteHolder, tKillHolder);
assertNotNull("tExecute", tExecuteHolder.throwable);
assertNull("tCancel", tKillHolder.throwable);
}
+ public void testKillQueryByTagOwner() throws Exception {
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+ testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder);
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
}
diff --git a/itests/src/test/resources/testconfiguration.properties
b/itests/src/test/resources/testconfiguration.properties
index 144a5a8ad48..16a3e082d99 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -36,7 +36,8 @@ disabled.query.files=ql_rewrite_gbtoidx.q,\
union_stats.q,\
sample2.q,\
sample4.q,\
- sample6.q
+ sample6.q, \
+ explainanalyze_2.q
# NOTE: Add tests to minitez only if it is very
@@ -437,6 +438,7 @@ minillap.query.files=acid_bucket_pruning.q,\
multi_count_distinct_null.q,\
cttl.q
+# explainanalyze_2.q to be fixed in HIVE-27795
minillaplocal.query.files=\
dec_str.q,\
dp_counter_non_mm.q,\
@@ -527,7 +529,6 @@ minillaplocal.query.files=\
escape1.q,\
escape2.q,\
exchgpartition2lel.q,\
- explainanalyze_2.q,\
explainuser_1.q,\
explainuser_4.q,\
external_jdbc_auth.q,\
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 06542cee02e..be5079fd7bb 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -997,8 +997,12 @@ public class HiveStatement implements java.sql.Statement {
@VisibleForTesting
public String getQueryId() throws SQLException {
+ TOperationHandle stmtHandleTemp = stmtHandle; // cache it, as it might get
modified by other thread.
+ if (stmtHandleTemp == null) {
+ throw new SQLException("stmtHandle is null");
+ }
try {
- return client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId();
+ return client.GetQueryId(new
TGetQueryIdReq(stmtHandleTemp)).getQueryId();
} catch (TException e) {
throw new SQLException(e);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 270ab6e9030..ae38aa4ef22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -701,7 +701,12 @@ public class Driver implements IDriver {
try {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
- doAuthorization(queryState.getHiveOperation(), sem, command);
+ // Authorization check for kill query will be in KillQueryImpl
+ // As both admin or operation owner can perform the operation.
+ // Which is not directly supported in authorizer
+ if (queryState.getHiveOperation() != HiveOperation.KILL_QUERY) {
+ doAuthorization(queryState.getHiveOperation(), sem, command);
+ }
} catch (AuthorizationException authExp) {
console.printError("Authorization failed:" + authExp.getMessage()
+ ". Use SHOW GRANT to get more details.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index b6f069966e6..a06dd485cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
/**
* The class to store query level info such as queryId. Multiple queries can
run
@@ -54,6 +56,11 @@ public class QueryState {
// id cannot be queried for some reason like hive server restart.
private String queryTag = null;
+ static public final String USERID_TAG = "userid";
+ /**
+ * Holds the number of rows affected for insert queries.
+ */
+ private long numModifiedRows = 0;
/**
* Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
@@ -107,21 +114,25 @@ public class QueryState {
}
public String getQueryTag() {
- return queryTag;
+ return HiveConf.getVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
}
public void setQueryTag(String queryTag) {
- this.queryTag = queryTag;
+ HiveConf.setVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG, queryTag);
}
- public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
- String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
- if (jobTag == null) {
+ public static void setApplicationTag(HiveConf queryConf, String queryTag) {
+ String jobTag = HiveConf.getVar(queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
+ if (jobTag == null || jobTag.isEmpty()) {
jobTag = queryTag;
} else {
jobTag = jobTag.concat("," + queryTag);
}
+ if (SessionState.get() != null) {
+ jobTag = jobTag.concat("," + USERID_TAG + "=" +
SessionState.get().getUserName());
+ }
queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+ queryConf.set(TezConfiguration.TEZ_APPLICATION_TAGS, jobTag);
}
/**
@@ -233,7 +244,7 @@ public class QueryState {
if (generateNewQueryId) {
String queryId = QueryPlan.makeQueryId();
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
- setMapReduceJobTag(queryConf, queryId);
+ setApplicationTag(queryConf, queryId);
// FIXME: druid storage handler relies on query.id to maintain some
staging directories
// expose queryid to session level
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index f357775c866..ee539ba1763 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.exec.tez;
import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.slf4j.Logger;
@@ -39,6 +41,9 @@ public class KillTriggerActionHandler implements
TriggerActionHandler<TezSession
TezSessionState sessionState = entry.getKey();
String queryId = sessionState.getWmContext().getQueryId();
try {
+ SessionState ss = new SessionState(new HiveConf());
+ ss.setIsHiveServerQuery(true);
+ SessionState.start(ss);
KillQuery killQuery = sessionState.getKillQuery();
// if kill query is null then session might have been released to
pool or closed already
if (killQuery != null) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 9029285835c..2478ab9fc67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -469,6 +469,9 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
final String reason = killCtx.reason;
LOG.info("Killing query for {}", toKill);
workPool.submit(() -> {
+ SessionState ss = new SessionState(new HiveConf());
+ ss.setIsHiveServerQuery(true);
+ SessionState.start(ss);
// Note: we get query ID here, rather than in the caller, where it
would be more correct
// because we know which exact query we intend to kill. This is
valid because we
// are not expecting query ID to change - we never reuse the
session for which a
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 42e0339d422..f83146125f3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -359,7 +359,7 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
String queryTag = config.getValue();
if (!StringUtils.isEmpty(queryTag)) {
- QueryState.setMapReduceJobTag(conf, queryTag);
+ QueryState.setApplicationTag(conf, queryTag);
}
queryState.setQueryTag(queryTag);
} else {
diff --git a/ql/src/test/queries/clientnegative/authorization_kill_query.q
b/ql/src/test/queries/clientnegative/authorization_kill_query.q
deleted file mode 100644
index 5379f877644..00000000000
--- a/ql/src/test/queries/clientnegative/authorization_kill_query.q
+++ /dev/null
@@ -1,15 +0,0 @@
-set hive.security.authorization.enabled=true;
-set hive.test.authz.sstd.hs2.mode=true;
-set
hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest;
-set
hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
-
-set user.name=hive_admin_user;
-set role ADMIN;
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';
-
-set user.name=ruser1;
-
--- kill query as non-admin should fail
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';
diff --git
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 2a776057b72..0f6864a8b66 100644
---
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -22,12 +22,18 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.MultimapBuilder;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -62,7 +68,8 @@ public class OperationManager extends AbstractService {
new ConcurrentHashMap<OperationHandle, Operation>();
private final ConcurrentHashMap<String, Operation> queryIdOperation =
new ConcurrentHashMap<String, Operation>();
- private final ConcurrentHashMap<String, String> queryTagToIdMap = new
ConcurrentHashMap<>();
+ private final SetMultimap<String, String> queryTagToIdMap =
+
Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build());
//Following fields for displaying queries on WebUI
private Object webuiLock = new Object();
@@ -205,12 +212,7 @@ public class OperationManager extends AbstractService {
public void updateQueryTag(String queryId, String queryTag) {
Operation operation = queryIdOperation.get(queryId);
if (operation != null) {
- String queryIdTemp = queryTagToIdMap.get(queryTag);
- if (queryIdTemp != null) {
- throw new RuntimeException("tag " + queryTag + " is already applied
for query " + queryIdTemp);
- }
queryTagToIdMap.put(queryTag, queryId);
- LOG.info("Query " + queryId + " is updated with tag " + queryTag);
return;
}
LOG.info("Query id is missing during query tag updation");
@@ -225,7 +227,7 @@ public class OperationManager extends AbstractService {
queryIdOperation.remove(queryId);
String queryTag = operation.getQueryTag();
if (queryTag != null) {
- queryTagToIdMap.remove(queryTag);
+ queryTagToIdMap.remove(queryTag, queryId);
}
LOG.info("Removed queryId: {} corresponding to operation: {} with tag:
{}", queryId, opHandle, queryTag);
if (operation instanceof SQLOperation) {
@@ -442,11 +444,14 @@ public class OperationManager extends AbstractService {
return queryIdOperation.get(queryId);
}
- public Operation getOperationByQueryTag(String queryTag) {
- String queryId = queryTagToIdMap.get(queryTag);
- if (queryId != null) {
- return getOperationByQueryId(queryId);
+ public Set<Operation> getOperationsByQueryTag(String queryTag) {
+ Set<String> queryIds = queryTagToIdMap.get(queryTag);
+ Set<Operation> result = new HashSet<Operation>();
+ for (String queryId : queryIds) {
+ if (queryId != null && getOperationByQueryId(queryId) != null) {
+ result.add(getOperationByQueryId(queryId));
+ }
}
- return null;
+ return result;
}
}
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 490a04da675..c7f2c9117b9 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -21,8 +21,13 @@ package org.apache.hive.service.server;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -40,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -49,6 +55,7 @@ public class KillQueryImpl implements KillQuery {
private final static Logger LOG =
LoggerFactory.getLogger(KillQueryImpl.class);
private final OperationManager operationManager;
+ private enum TagOrId {TAG, ID, UNKNOWN};
public KillQueryImpl(OperationManager operationManager) {
this.operationManager = operationManager;
@@ -64,7 +71,10 @@ public class KillQueryImpl implements KillQuery {
GetApplicationsResponse apps = proxy.getApplications(gar);
List<ApplicationReport> appsList = apps.getApplicationList();
for(ApplicationReport appReport : appsList) {
- childYarnJobs.add(appReport.getApplicationId());
+ if (isAdmin() ||
appReport.getApplicationTags().contains(QueryState.USERID_TAG + "=" +
SessionState.get()
+ .getUserName())) {
+ childYarnJobs.add(appReport.getApplicationId());
+ }
}
if (childYarnJobs.isEmpty()) {
@@ -81,6 +91,7 @@ public class KillQueryImpl implements KillQuery {
if (tag == null) {
return;
}
+ LOG.info("Killing yarn jobs using query tag:" + tag);
Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
if (!childYarnJobs.isEmpty()) {
YarnClient yarnClient = YarnClient.createYarnClient();
@@ -91,44 +102,87 @@ public class KillQueryImpl implements KillQuery {
}
}
} catch (IOException | YarnException ye) {
- throw new RuntimeException("Exception occurred while killing child
job(s)", ye);
+ LOG.warn("Exception occurred while killing child job({})", ye);
+ }
+ }
+
+ private static boolean isAdmin() {
+ boolean isAdmin = false;
+ if (SessionState.get().getAuthorizerV2() != null) {
+ try {
+
SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY,
+ new ArrayList<HivePrivilegeObject>(), new
ArrayList<HivePrivilegeObject>(),
+ new HiveAuthzContext.Builder().build());
+ isAdmin = true;
+ } catch (Exception e) {
+ }
+ }
+ return isAdmin;
+ }
+
+ private boolean cancelOperation(Operation operation, boolean isAdmin, String
errMsg) throws
+ HiveSQLException {
+ if (isAdmin ||
operation.getParentSession().getUserName().equals(SessionState.get()
+ .getAuthenticator().getUserName())) {
+ OperationHandle handle = operation.getHandle();
+ operationManager.cancelOperation(handle, errMsg);
+ return true;
+ } else {
+ return false;
}
}
@Override
- public void killQuery(String queryId, String errMsg, HiveConf conf) throws
HiveException {
+ public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf)
throws HiveException {
try {
- String queryTag = null;
-
- Operation operation = operationManager.getOperationByQueryId(queryId);
- if (operation == null) {
- // Check if user has passed the query tag to kill the operation. This
is possible if the application
- // restarts and it does not have the proper query id. The tag can be
used in that case to kill the query.
- operation = operationManager.getOperationByQueryTag(queryId);
- if (operation == null) {
- LOG.info("Query not found: " + queryId);
- }
+ TagOrId tagOrId = TagOrId.UNKNOWN;
+ Set<Operation> operationsToKill = new HashSet<Operation>();
+ if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+
operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
+ tagOrId = TagOrId.ID;
} else {
- // This is the normal flow, where the query is tagged and user wants
to kill the query using the query id.
- queryTag = operation.getQueryTag();
+
operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
+ if (!operationsToKill.isEmpty()) {
+ tagOrId = TagOrId.TAG;
+ }
}
-
- if (queryTag == null) {
- //use query id as tag if user wanted to kill only the yarn jobs after
hive server restart. The yarn jobs are
- //tagged with query id by default. This will cover the case where the
application after restarts wants to kill
- //the yarn jobs with query tag. The query tag can be passed as query
id.
- queryTag = queryId;
+ if (operationsToKill.isEmpty()) {
+ LOG.info("Query not found: " + queryIdOrTag);
}
-
- LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :"
+ queryTag);
- killChildYarnJobs(conf, queryTag);
-
- if (operation != null) {
- OperationHandle handle = operation.getHandle();
- operationManager.cancelOperation(handle, errMsg);
+ boolean admin = isAdmin();
+ switch(tagOrId) {
+ case ID:
+ Operation operation = operationsToKill.iterator().next();
+ boolean canceled = cancelOperation(operation, admin, errMsg);
+ if (canceled) {
+ String queryTag = operation.getQueryTag();
+ if (queryTag == null) {
+ queryTag = queryIdOrTag;
+ }
+ killChildYarnJobs(conf, queryTag);
+ } else {
+ // no privilege to cancel
+ throw new HiveSQLException("No privilege");
+ }
+ break;
+ case TAG:
+ int numCanceled = 0;
+ for (Operation operationToKill : operationsToKill) {
+ if (cancelOperation(operationToKill, admin, errMsg)) {
+ numCanceled++;
+ }
+ }
+ killChildYarnJobs(conf, queryIdOrTag);
+ if (numCanceled == 0) {
+ throw new HiveSQLException("No privilege");
+ }
+ break;
+ case UNKNOWN:
+ killChildYarnJobs(conf, queryIdOrTag);
+ break;
}
} catch (HiveSQLException e) {
- LOG.error("Kill query failed for query " + queryId, e);
+ LOG.error("Kill query failed for query " + queryIdOrTag, e);
throw new HiveException(e.getMessage(), e);
}
}