http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/ql/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 40cafcc,ee539ba..728dae4 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@@ -32,20 -34,24 +34,24 @@@ public class KillTriggerActionHandler i private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); @Override - public void applyAction(final Map<TezSessionState, Trigger> queriesViolated) { - for (Map.Entry<TezSessionState, Trigger> entry : queriesViolated.entrySet()) { + public void applyAction(final Map<TezSession, Trigger> queriesViolated) { + for (Map.Entry<TezSession, Trigger> entry : queriesViolated.entrySet()) { switch (entry.getValue().getAction().getType()) { case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getWmContext().getQueryId(); + TezSession sessionState = entry.getKey(); try { - // then session might have been released to pool or closed already ++ // if kill query is null then session might have been released to pool or closed already ++ // TODO: what's up with this hacky session-start code? + SessionState ss = new SessionState(new HiveConf()); + ss.setIsHiveServerQuery(true); + SessionState.start(ss); - KillQuery killQuery = sessionState.getKillQuery(); - // if kill query is null then session might have been released to pool or closed already - if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), - sessionState.getConf()); + boolean wasKilled = sessionState.killQuery(entry.getValue().getViolationMsg()); + if (!wasKilled) { + LOG.info("Didn't kill the query {}", sessionState.getWmContext().getQueryId()); } } catch (HiveException e) { - LOG.warn("Unable to kill query {} for trigger violation"); + LOG.warn("Unable to kill query {} for trigger violation", + sessionState.getWmContext().getQueryId()); } break; default: http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c55347d5/service/src/java/org/apache/hive/service/server/KillQueryImpl.java ---------------------------------------------------------------------- diff --cc service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 2c23253,c7f2c91..65a6158 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@@ -96,42 -133,56 +133,66 @@@ public class KillQueryImpl implements K } @Override - public void killQuery( - String queryId, String errMsg, HiveConf conf, boolean isYarn) throws HiveException { - public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException { ++ public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf, boolean isYarn) throws HiveException { try { - String queryTag = null; - - Operation operation = operationManager.getOperationByQueryId(queryId); - if (operation == null) { - // Check if user has passed the query tag to kill the operation. This is possible if the application - // restarts and it does not have the proper query id. The tag can be used in that case to kill the query. - operation = operationManager.getOperationByQueryTag(queryId); - if (operation == null) { - LOG.info("Query not found: " + queryId); - } + TagOrId tagOrId = TagOrId.UNKNOWN; + Set<Operation> operationsToKill = new HashSet<Operation>(); + if (operationManager.getOperationByQueryId(queryIdOrTag) != null) { + operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag)); + tagOrId = TagOrId.ID; } else { - // This is the normal flow, where the query is tagged and user wants to kill the query using the query id. - queryTag = operation.getQueryTag(); - } - - if (queryTag == null) { - //use query id as tag if user wanted to kill only the yarn jobs after hive server restart. The yarn jobs are - //tagged with query id by default. This will cover the case where the application after restarts wants to kill - //the yarn jobs with query tag. The query tag can be passed as query id. - queryTag = queryId; + operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag)); + if (!operationsToKill.isEmpty()) { + tagOrId = TagOrId.TAG; + } } - - if (isYarn) { - LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag); - killChildYarnJobs(conf, queryTag); + if (operationsToKill.isEmpty()) { + LOG.info("Query not found: " + queryIdOrTag); } + - if (operation != null) { - OperationHandle handle = operation.getHandle(); - operationManager.cancelOperation(handle, errMsg); + boolean admin = isAdmin(); + switch(tagOrId) { + case ID: + Operation operation = operationsToKill.iterator().next(); + boolean canceled = cancelOperation(operation, admin, errMsg); + if (canceled) { + String queryTag = operation.getQueryTag(); + if (queryTag == null) { + queryTag = queryIdOrTag; + } - killChildYarnJobs(conf, queryTag); ++ if (isYarn) { ++ LOG.info("Killing yarn jobs for " + queryIdOrTag); ++ killChildYarnJobs(conf, queryTag); ++ } + } else { + // no privilege to cancel + throw new HiveSQLException("No privilege"); + } + break; + case TAG: + int numCanceled = 0; + for (Operation operationToKill : operationsToKill) { + if (cancelOperation(operationToKill, admin, errMsg)) { + numCanceled++; + } + } - killChildYarnJobs(conf, queryIdOrTag); ++ if (isYarn) { ++ LOG.info("Killing yarn jobs for " + queryIdOrTag); ++ killChildYarnJobs(conf, queryIdOrTag); ++ } + if (numCanceled == 0) { + throw new HiveSQLException("No privilege"); + } + break; + case UNKNOWN: - killChildYarnJobs(conf, queryIdOrTag); ++ if (isYarn) { ++ LOG.info("Killing yarn jobs for " + queryIdOrTag); ++ killChildYarnJobs(conf, queryIdOrTag); ++ } + break; } } catch (HiveSQLException e) { - LOG.error("Kill query failed for query " + queryId, e); + LOG.error("Kill query failed for query " + queryIdOrTag, e); throw new HiveException(e.getMessage(), e); } }
