This is an automated email from the ASF dual-hosted git repository.
mgergely pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 23db35e HIVE-22526 Extract Compiler from Driver (Miklos Gergely,
reviewed by Zoltan Haindrich)
23db35e is described below
commit 23db35e092ce1d09c5993b45c8b0f790505fc1a5
Author: miklosgergely <[email protected]>
AuthorDate: Thu Nov 21 13:38:46 2019 +0100
HIVE-22526 Extract Compiler from Driver (Miklos Gergely, reviewed by Zoltan
Haindrich)
---
.../java/org/apache/hadoop/hive/ql/Compiler.java | 483 ++++++++++++++
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 721 ++++-----------------
.../org/apache/hadoop/hive/ql/DriverContext.java | 8 +
.../org/apache/hadoop/hive/ql/DriverUtils.java | 74 ++-
.../apache/hadoop/hive/ql/exec/ExplainTask.java | 54 +-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 9 +
6 files changed, 753 insertions(+), 596 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
new file mode 100644
index 0000000..a559d90
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveVariableSource;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import
org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * The compiler compiles the command, by creating a QueryPlan from a String
command.
+ * Also opens a transaction if necessary.
+ */
+public class Compiler {
+ private static final String CLASS_NAME = Driver.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+ private final Context context;
+ private final DriverContext driverContext;
+ private final DriverState driverState;
+
+ private ASTNode tree;
+
+ public Compiler(Context context, DriverContext driverContext, DriverState
driverState) {
+ this.context = context;
+ this.driverContext = driverContext;
+ this.driverState = driverState;
+ }
+
+ /**
+ * @param deferClose indicates if the close/destroy should be deferred when
the process has been interrupted
+ * it should be set to true if the compile is called within
another method like runInternal,
+ * which defers the close to the called in that method.
+ */
+ public QueryPlan compile(String rawCommand, boolean deferClose) throws
CommandProcessorException {
+ initialize(rawCommand);
+
+ boolean compileError = false;
+ boolean parsed = false;
+ QueryPlan plan = null;
+ try {
+ DriverUtils.checkInterrupted(driverState, driverContext, "before parsing
and analysing the query", null, null);
+
+ parse();
+ parsed = true;
+ BaseSemanticAnalyzer sem = analyze();
+
+ DriverUtils.checkInterrupted(driverState, driverContext, "after
analyzing query.", null, null);
+
+ plan = createPlan(sem);
+ authorize(sem);
+ explainOutput(sem, plan);
+ } catch (CommandProcessorException cpe) {
+ compileError = true;
+ throw cpe;
+ } catch (Exception e) {
+ compileError = true;
+ DriverUtils.checkInterrupted(driverState, driverContext, "during query
compilation: " + e.getMessage(), null,
+ null);
+ handleException(e);
+ } finally {
+ cleanUp(compileError, parsed, deferClose);
+ }
+
+ return plan;
+ }
+
+ private void initialize(String rawCommand) throws CommandProcessorException {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
+ driverState.compilingWithLocking();
+
+ VariableSubstitution variableSubstitution = new VariableSubstitution(new
HiveVariableSource() {
+ @Override
+ public Map<String, String> getHiveVariable() {
+ return SessionState.get().getHiveVariables();
+ }
+ });
+ String command = variableSubstitution.substitute(driverContext.getConf(),
rawCommand);
+
+ String queryStr = command;
+ try {
+ // command should be redacted to avoid to logging sensitive data
+ queryStr = HookUtils.redactLogString(driverContext.getConf(), command);
+ } catch (Exception e) {
+ LOG.warn("WARNING! Query command could not be redacted." + e);
+ }
+
+ DriverUtils.checkInterrupted(driverState, driverContext, "at beginning of
compilation.", null, null);
+
+ context.setCmd(command);
+ driverContext.getQueryDisplay().setQueryStr(queryStr);
+ LOG.info("Compiling command(queryId=" + driverContext.getQueryId() + "): "
+ queryStr);
+
+ driverContext.getConf().setQueryString(queryStr);
+ // FIXME: side effect will leave the last query set at the session level
+ if (SessionState.get() != null) {
+ SessionState.get().getConf().setQueryString(queryStr);
+ SessionState.get().setupQueryCurrentTimestamp();
+ }
+ }
+
+ private void parse() throws ParseException {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
+
+ // Trigger query hook before compilation
+ driverContext.getHookRunner().runBeforeParseHook(context.getCmd());
+
+ boolean success = false;
+ try {
+ tree = ParseUtils.parse(context.getCmd(), context);
+ success = true;
+ } finally {
+ driverContext.getHookRunner().runAfterParseHook(context.getCmd(),
!success);
+ }
+ SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
+ }
+
+ private BaseSemanticAnalyzer analyze() throws Exception {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
+
+ driverContext.getHookRunner().runBeforeCompileHook(context.getCmd());
+
+ // clear CurrentFunctionsInUse set, to capture new set of functions
+ // that SemanticAnalyzer finds are in use
+ SessionState.get().getCurrentFunctionsInUse().clear();
+
+ // Flush the metastore cache. This assures that we don't pick up objects
from a previous
+ // query running in this same thread. This has to be done after we get
our semantic
+ // analyzer (this is when the connection to the metastore is made) but
before we analyze,
+ // because at that point we need access to the objects.
+ Hive.get().getMSC().flushCache();
+
+ driverContext.setBackupContext(new Context(context));
+ boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
+
+ HiveSemanticAnalyzerHookContext hookCtx = new
HiveSemanticAnalyzerHookContextImpl();
+ if (executeHooks) {
+ hookCtx.setConf(driverContext.getConf());
+ hookCtx.setUserName(SessionState.get().getUserName());
+ hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
+ hookCtx.setCommand(context.getCmd());
+
hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation());
+
+ tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree);
+ }
+
+ // SemanticAnalyzerFactory also sets the hive operation in query state
+ BaseSemanticAnalyzer sem =
SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree);
+
+ if (!driverContext.isRetrial()) {
+ if ((driverContext.getQueryState().getHiveOperation() != null) &&
+
driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP))
{
+ setLastReplIdForDump(driverContext.getQueryState().getConf());
+ }
+ driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(),
tree));
+ openTransaction(driverContext.getTxnType());
+
+ generateValidTxnList();
+ }
+
+ // Do semantic analysis and plan generation
+ sem.analyze(tree, context);
+
+ if (executeHooks) {
+ hookCtx.update(sem);
+ driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx,
sem.getAllRootTasks());
+ }
+
+ LOG.info("Semantic Analysis Completed (retrial = {})",
driverContext.isRetrial());
+
+ // Retrieve information about cache usage for the query.
+ if
(driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED))
{
+ driverContext.setCacheUsage(sem.getCacheUsage());
+ }
+
+ // validate the plan
+ sem.validate();
+
+ SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
+
+ return sem;
+ }
+
+ /**
+ * Last repl id should be captured before opening txn by current REPL DUMP
operation.
+ * This is needed to avoid losing data which are added/modified by
concurrent txns when bootstrap
+ * dump in progress.
+ * @param conf Query configurations
+ * @throws HiveException
+ * @throws TException
+ */
+ private void setLastReplIdForDump(HiveConf conf) throws HiveException,
TException {
+ // Last logged notification event id would be the last repl Id for the
current REPl DUMP.
+ Hive hiveDb = Hive.get();
+ Long lastReplId =
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId);
+ LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
+ }
+
+ private void openTransaction(TxnType txnType) throws LockException,
CommandProcessorException {
+ if (DriverUtils.checkConcurrency(driverContext) &&
startImplicitTxn(driverContext.getTxnManager()) &&
+ !driverContext.getTxnManager().isTxnOpen()) {
+ String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
+ driverContext.getTxnManager().openTxn(context, userFromUGI, txnType);
+ }
+ }
+
+ private boolean startImplicitTxn(HiveTxnManager txnManager) throws
LockException {
+ //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+ HiveOperation hiveOperation =
driverContext.getQueryState().getHiveOperation();
+ switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) {
+ case COMMIT:
+ case ROLLBACK:
+ if (!txnManager.isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN,
hiveOperation.getOperationName());
+ }
+ case SWITCHDATABASE:
+ case SET_AUTOCOMMIT:
+ /**
+ * autocommit is here for completeness. TM doesn't use it. If we want
to support JDBC
+ * semantics (or any other definition of autocommit) it should be done
at session level.
+ */
+ case SHOWDATABASES:
+ case SHOWTABLES:
+ case SHOWCOLUMNS:
+ case SHOWFUNCTIONS:
+ case SHOWPARTITIONS:
+ case SHOWLOCKS:
+ case SHOWVIEWS:
+ case SHOW_ROLES:
+ case SHOW_ROLE_PRINCIPALS:
+ case SHOW_COMPACTIONS:
+ case SHOW_TRANSACTIONS:
+ case ABORT_TRANSACTIONS:
+ case KILL_QUERY:
+ return false;
+ //this implies that no locks are needed for such a command
+ default:
+ return !context.isExplainPlan();
+ }
+ }
+
+ private void generateValidTxnList() throws LockException {
+ // Record current valid txn list that will be used throughout the query
+ // compilation and processing. We only do this if 1) a transaction
+ // was already opened and 2) the list has not been recorded yet,
+ // e.g., by an explicit open transaction command.
+ driverContext.setValidTxnListsGenerated(false);
+ String currentTxnString =
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+ if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null
|| currentTxnString.isEmpty())) {
+ try {
+ recordValidTxns(driverContext.getTxnManager());
+ driverContext.setValidTxnListsGenerated(true);
+ } catch (LockException e) {
+ LOG.error("Exception while acquiring valid txn list", e);
+ throw e;
+ }
+ }
+ }
+
+ // Write the current set of valid transactions into the conf file
+ private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
+ String oldTxnString =
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+ if ((oldTxnString != null) && (oldTxnString.length() > 0)) {
+ throw new IllegalStateException("calling recordValidTxn() more than once
in the same " +
+ JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+ }
+ ValidTxnList txnList = txnMgr.getValidTxns();
+ String txnStr = txnList.toString();
+ driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr);
+ LOG.debug("Encoding valid txns info " + txnStr + " txnid:" +
txnMgr.getCurrentTxnId());
+ }
+
+ private QueryPlan createPlan(BaseSemanticAnalyzer sem) {
+ // get the output schema
+ setSchema(sem);
+ QueryPlan plan = new QueryPlan(driverContext.getQueryString(), sem,
+ driverContext.getQueryDisplay().getQueryStartTime(),
driverContext.getQueryId(),
+ driverContext.getQueryState().getHiveOperation(),
driverContext.getSchema());
+ // save the optimized plan and sql for the explain
+ plan.setOptimizedCBOPlan(context.getCalcitePlan());
+ plan.setOptimizedQueryString(context.getOptimizedSql());
+
+ driverContext.getConf().set("mapreduce.workflow.id", "hive_" +
driverContext.getQueryId());
+ driverContext.getConf().set("mapreduce.workflow.name",
driverContext.getQueryString());
+
+ // initialize FetchTask right here
+ if (plan.getFetchTask() != null) {
+ plan.getFetchTask().initialize(driverContext.getQueryState(), plan,
null, context);
+ }
+
+ return plan;
+ }
+
+ /**
+ * Get a Schema with fields represented with native Hive types.
+ */
+ private void setSchema(BaseSemanticAnalyzer sem) {
+ Schema schema = new Schema();
+
+ // If we have a plan, prefer its logical result schema if it's available;
otherwise, try digging out a fetch task;
+ // failing that, give up.
+ if (sem == null) {
+ LOG.info("No semantic analyzer, using empty schema.");
+ } else if (sem.getResultSchema() != null) {
+ List<FieldSchema> lst = sem.getResultSchema();
+ schema = new Schema(lst, null);
+ } else if (sem.getFetchTask() != null) {
+ FetchTask ft = sem.getFetchTask();
+ TableDesc td = ft.getTblDesc();
+ // partitioned tables don't have tableDesc set on the FetchTask. Instead
they have a list of PartitionDesc
+ // objects, each with a table desc. Let's try to fetch the desc for the
first partition and use it's deserializer.
+ if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() !=
null) {
+ if (ft.getWork().getPartDesc().size() > 0) {
+ td = ft.getWork().getPartDesc().get(0).getTableDesc();
+ }
+ }
+
+ if (td == null) {
+ LOG.info("No returning schema, using empty schema");
+ } else {
+ String tableName = "result";
+ List<FieldSchema> lst = null;
+ try {
+ lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName,
td.getDeserializer(driverContext.getConf()));
+ } catch (Exception e) {
+ LOG.warn("Error getting schema: " +
StringUtils.stringifyException(e));
+ }
+ if (lst != null) {
+ schema = new Schema(lst, null);
+ }
+ }
+ }
+
+ LOG.info("Created Hive schema: " + schema);
+ driverContext.setSchema(schema);
+ }
+
+ private void authorize(BaseSemanticAnalyzer sem) throws HiveException,
CommandProcessorException {
+ // do the authorization check
+ if (!sem.skipAuthorization() &&
+ HiveConf.getBoolVar(driverContext.getConf(),
HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+
+ try {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME,
PerfLogger.DO_AUTHORIZATION);
+ // Authorization check for kill query will be in KillQueryImpl
+ // As both admin or operation owner can perform the operation.
+ // Which is not directly supported in authorizer
+ if (driverContext.getQueryState().getHiveOperation() !=
HiveOperation.KILL_QUERY) {
+
CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(),
sem, context.getCmd());
+ }
+ } catch (AuthorizationException authExp) {
+ CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ".
Use SHOW GRANT to get more details.");
+ throw DriverUtils.createProcessorException(driverContext, 403,
authExp.getMessage(), "42000", null);
+ } finally {
+ SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME,
PerfLogger.DO_AUTHORIZATION);
+ }
+ }
+ }
+
+ private void explainOutput(BaseSemanticAnalyzer sem, QueryPlan plan) throws
IOException {
+ if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) ||
+
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT))
{
+ String explainOutput = ExplainTask.getExplainOutput(sem, plan, tree,
driverContext.getQueryState(),
+ context, driverContext.getConf());
+ if (explainOutput != null) {
+ if
(driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
+ LOG.info("EXPLAIN output for queryid " + driverContext.getQueryId()
+ " : " + explainOutput);
+ }
+ if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() &&
+
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT))
{
+ driverContext.getQueryDisplay().setExplainPlan(explainOutput);
+ }
+ }
+ }
+ }
+
+ private void handleException(Exception e) throws CommandProcessorException {
+ ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
+ String errorMessage = "FAILED: " + e.getClass().getSimpleName();
+ if (error != ErrorMsg.GENERIC_ERROR) {
+ errorMessage += " [Error " + error.getErrorCode() + "]:";
+ }
+
+ // HIVE-4889
+ if ((e instanceof IllegalArgumentException) && e.getMessage() == null &&
e.getCause() != null) {
+ errorMessage += " " + e.getCause().getMessage();
+ } else {
+ errorMessage += " " + e.getMessage();
+ }
+
+ if (error == ErrorMsg.TXNMGR_NOT_ACID) {
+ errorMessage += ". Failed command: " + driverContext.getQueryString();
+ }
+
+ CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e));
+ throw DriverUtils.createProcessorException(driverContext,
error.getErrorCode(), errorMessage, error.getSQLState(),
+ e);
+ }
+
+ private void cleanUp(boolean compileError, boolean parsed, boolean
deferClose) {
+ // Trigger post compilation hook. Note that if the compilation fails here
then
+ // before/after execution hook will never be executed.
+ if (parsed) {
+ try {
+
driverContext.getHookRunner().runAfterCompilationHook(context.getCmd(),
compileError);
+ } catch (Exception e) {
+ LOG.warn("Failed when invoking query after-compilation hook.", e);
+ }
+ }
+
+ double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME,
PerfLogger.COMPILE) / 1000.00;
+ ImmutableMap<String, Long> compileHMSTimings =
Hive.dumpMetaCallTimingWithoutEx("compilation");
+
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION,
compileHMSTimings);
+
+ if (driverState.isAborted()) {
+ driverState.compilationInterruptedWithLocking(deferClose);
+ LOG.info("Compiling command(queryId={}) has been interrupted after {}
seconds", driverContext.getQueryId(),
+ duration);
+ } else {
+ driverState.compilationFinishedWithLocking(compileError);
+ LOG.info("Completed compiling command(queryId={}); Time taken: {}
seconds", driverContext.getQueryId(),
+ duration);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a28bf16..7549144 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -18,10 +18,8 @@
package org.apache.hadoop.hive.ql;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
-import java.io.PrintStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
@@ -48,10 +46,6 @@ import
org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.conf.HiveVariableSource;
-import org.apache.hadoop.hive.conf.VariableSubstitution;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.Schema;
@@ -73,12 +67,10 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.TaskResult;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -89,22 +81,14 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
-import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
-import org.apache.hadoop.hive.ql.parse.ParseException;
-import org.apache.hadoop.hive.ql.parse.ParseUtils;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -112,7 +96,6 @@ import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import
org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer;
import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -122,8 +105,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.common.util.TxnIdUtils;
-import org.apache.thrift.TException;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,7 +124,7 @@ public class Driver implements IDriver {
private ByteStream.Output bos = new ByteStream.Output();
private DataInput resStream;
- private Context ctx;
+ private Context context;
private final DriverContext driverContext;
private TaskQueue taskQueue;
private final List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
@@ -153,10 +134,6 @@ public class Driver implements IDriver {
private DriverState driverState = new DriverState();
- private boolean checkConcurrency() {
- return
driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
- }
-
@Override
public Schema getSchema() {
return driverContext.getSchema();
@@ -168,61 +145,11 @@ public class Driver implements IDriver {
@Override
public Context getContext() {
- return ctx;
+ return context;
}
public PlanMapper getPlanMapper() {
- return ctx.getPlanMapper();
- }
-
- /**
- * Get a Schema with fields represented with native Hive types
- */
- private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
- Schema schema = null;
-
- // If we have a plan, prefer its logical result schema if it's
- // available; otherwise, try digging out a fetch task; failing that,
- // give up.
- if (sem == null) {
- // can't get any info without a plan
- } else if (sem.getResultSchema() != null) {
- List<FieldSchema> lst = sem.getResultSchema();
- schema = new Schema(lst, null);
- } else if (sem.getFetchTask() != null) {
- FetchTask ft = sem.getFetchTask();
- TableDesc td = ft.getTblDesc();
- // partitioned tables don't have tableDesc set on the FetchTask. Instead
- // they have a list of PartitionDesc objects, each with a table desc.
- // Let's
- // try to fetch the desc for the first partition and use it's
- // deserializer.
- if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() !=
null) {
- if (ft.getWork().getPartDesc().size() > 0) {
- td = ft.getWork().getPartDesc().get(0).getTableDesc();
- }
- }
-
- if (td == null) {
- LOG.info("No returning schema.");
- } else {
- String tableName = "result";
- List<FieldSchema> lst = null;
- try {
- lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName,
td.getDeserializer(conf));
- } catch (Exception e) {
- LOG.warn("Error getting schema: " +
StringUtils.stringifyException(e));
- }
- if (lst != null) {
- schema = new Schema(lst, null);
- }
- }
- }
- if (schema == null) {
- schema = new Schema();
- }
- LOG.info("Returning Hive schema: " + schema);
- return schema;
+ return context.getPlanMapper();
}
/**
@@ -242,7 +169,7 @@ public class Driver implements IDriver {
// or compile another query
public Driver(HiveConf conf, Context ctx, LineageState lineageState) {
this(getNewQueryState(conf, lineageState), null);
- this.ctx = ctx;
+ context = ctx;
}
public Driver(QueryState queryState) {
@@ -293,261 +220,30 @@ public class Driver implements IDriver {
// runInternal, which defers the close to the called in that method.
@VisibleForTesting
public void compile(String command, boolean resetTaskIds, boolean
deferClose) throws CommandProcessorException {
- createTransactionManager();
+ preparForCompile(resetTaskIds);
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
- driverState.compilingWithLocking();
+ Compiler compiler = new Compiler(context, driverContext, driverState);
+ QueryPlan plan = compiler.compile(command, deferClose);
+ driverContext.setPlan(plan);
- command = new VariableSubstitution(new HiveVariableSource() {
- @Override
- public Map<String, String> getHiveVariable() {
- return SessionState.get().getHiveVariables();
- }
- }).substitute(driverContext.getConf(), command);
-
- String queryStr = command;
+ compileFinished(deferClose);
+ }
- try {
- // command should be redacted to avoid to logging sensitive data
- queryStr = HookUtils.redactLogString(driverContext.getConf(), command);
- } catch (Exception e) {
- LOG.warn("WARNING! Query command could not be redacted." + e);
+ private void compileFinished(boolean deferClose) {
+ if (DriverState.getDriverState().isAborted() && !deferClose) {
+ closeInProcess(true);
}
+ }
- checkInterrupted("at beginning of compilation.", null, null);
-
- if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
- // close the existing ctx etc before compiling a new query, but does not
destroy driver
- closeInProcess(false);
- }
+ private void preparForCompile(boolean resetTaskIds) throws
CommandProcessorException {
+ createTransactionManager();
+ DriverState.setDriverState(driverState);
+ prepareContext();
+ setQueryId();
if (resetTaskIds) {
TaskFactory.resetId();
}
-
- DriverState.setDriverState(driverState);
-
- final String queryId =
Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ?
- QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId();
-
- SparkSession ss = SessionState.get().getSparkSession();
- if (ss != null) {
- ss.onQuerySubmission(queryId);
- }
-
- if (ctx != null) {
- setTriggerContext(queryId);
- }
- //save some info for webUI for use after plan is freed
- driverContext.getQueryDisplay().setQueryStr(queryStr);
- driverContext.getQueryDisplay().setQueryId(queryId);
-
- LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
-
- driverContext.getConf().setQueryString(queryStr);
- // FIXME: sideeffect will leave the last query set at the session level
- if (SessionState.get() != null) {
- SessionState.get().getConf().setQueryString(queryStr);
- SessionState.get().setupQueryCurrentTimestamp();
- }
-
- // Whether any error occurred during query compilation. Used for query
lifetime hook.
- boolean compileError = false;
- boolean parseError = false;
-
- try {
- checkInterrupted("before parsing and analysing the query", null, null);
-
- if (ctx == null) {
- ctx = new Context(driverContext.getConf());
- setTriggerContext(queryId);
- }
-
- ctx.setHiveTxnManager(driverContext.getTxnManager());
- ctx.setStatsSource(driverContext.getStatsSource());
- ctx.setCmd(command);
- ctx.setHDFSCleanup(true);
-
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
-
- // Trigger query hook before compilation
- driverContext.getHookRunner().runBeforeParseHook(command);
-
- ASTNode tree;
- try {
- tree = ParseUtils.parse(command, ctx);
- } catch (ParseException e) {
- parseError = true;
- throw e;
- } finally {
- driverContext.getHookRunner().runAfterParseHook(command, parseError);
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
-
- driverContext.getHookRunner().runBeforeCompileHook(command);
- // clear CurrentFunctionsInUse set, to capture new set of functions
- // that SemanticAnalyzer finds are in use
- SessionState.get().getCurrentFunctionsInUse().clear();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
-
- // Flush the metastore cache. This assures that we don't pick up
objects from a previous
- // query running in this same thread. This has to be done after we get
our semantic
- // analyzer (this is when the connection to the metastore is made) but
before we analyze,
- // because at that point we need access to the objects.
- Hive.get().getMSC().flushCache();
-
- driverContext.setBackupContext(new Context(ctx));
- boolean executeHooks =
driverContext.getHookRunner().hasPreAnalyzeHooks();
-
- HiveSemanticAnalyzerHookContext hookCtx = new
HiveSemanticAnalyzerHookContextImpl();
- if (executeHooks) {
- hookCtx.setConf(driverContext.getConf());
- hookCtx.setUserName(SessionState.get().getUserName());
- hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
- hookCtx.setCommand(command);
-
hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation());
-
- tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree);
- }
-
- // Do semantic analysis and plan generation
- BaseSemanticAnalyzer sem =
SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree);
-
- if (!driverContext.isRetrial()) {
- if ((driverContext.getQueryState().getHiveOperation() != null) &&
-
driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP))
{
- setLastReplIdForDump(driverContext.getQueryState().getConf());
- }
- driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(),
tree));
- openTransaction(driverContext.getTxnType());
-
- generateValidTxnList();
- }
-
- sem.analyze(tree, ctx);
-
- if (executeHooks) {
- hookCtx.update(sem);
- driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx,
sem.getAllRootTasks());
- }
-
- LOG.info("Semantic Analysis Completed (retrial = {})",
driverContext.isRetrial());
-
- // Retrieve information about cache usage for the query.
- if
(driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED))
{
- driverContext.setCacheUsage(sem.getCacheUsage());
- }
-
- // validate the plan
- sem.validate();
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
-
- checkInterrupted("after analyzing query.", null, null);
-
- // get the output schema
- driverContext.setSchema(getSchema(sem, driverContext.getConf()));
- QueryPlan plan = new QueryPlan(queryStr, sem,
driverContext.getQueryDisplay().getQueryStartTime(), queryId,
- driverContext.getQueryState().getHiveOperation(),
driverContext.getSchema());
- // save the optimized plan and sql for the explain
- plan.setOptimizedCBOPlan(ctx.getCalcitePlan());
- plan.setOptimizedQueryString(ctx.getOptimizedSql());
- driverContext.setPlan(plan);
-
- driverContext.getConf().set("mapreduce.workflow.id", "hive_" + queryId);
- driverContext.getConf().set("mapreduce.workflow.name", queryStr);
-
- // initialize FetchTask right here
- if (plan.getFetchTask() != null) {
- plan.getFetchTask().initialize(driverContext.getQueryState(), plan,
null, ctx);
- }
-
- //do the authorization check
- if (!sem.skipAuthorization() &&
- HiveConf.getBoolVar(driverContext.getConf(),
HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
-
- try {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
- // Authorization check for kill query will be in KillQueryImpl
- // As both admin or operation owner can perform the operation.
- // Which is not directly supported in authorizer
- if (driverContext.getQueryState().getHiveOperation() !=
HiveOperation.KILL_QUERY) {
-
CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(),
sem, command);
- }
- } catch (AuthorizationException authExp) {
- CONSOLE.printError("Authorization failed:" + authExp.getMessage() +
". Use SHOW GRANT to get more details.");
- throw createProcessorException(403, authExp.getMessage(), "42000",
null);
- } finally {
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
- }
- }
-
- if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)
- ||
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT))
{
- String explainOutput = getExplainOutput(sem, plan, tree);
- if (explainOutput != null) {
- if
(driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
- LOG.info("EXPLAIN output for queryid " + queryId + " : " +
explainOutput);
- }
- if (driverContext.getConf().isWebUiQueryInfoCacheEnabled()
- &&
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT))
{
- driverContext.getQueryDisplay().setExplainPlan(explainOutput);
- }
- }
- }
- } catch (CommandProcessorException cpe) {
- throw cpe;
- } catch (Exception e) {
- checkInterrupted("during query compilation: " + e.getMessage(), null,
null);
-
- compileError = true;
- ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
- String errorMessage = "FAILED: " + e.getClass().getSimpleName();
- if (error != ErrorMsg.GENERIC_ERROR) {
- errorMessage += " [Error " + error.getErrorCode() + "]:";
- }
-
- // HIVE-4889
- if ((e instanceof IllegalArgumentException) && e.getMessage() == null &&
e.getCause() != null) {
- errorMessage += " " + e.getCause().getMessage();
- } else {
- errorMessage += " " + e.getMessage();
- }
-
- if (error == ErrorMsg.TXNMGR_NOT_ACID) {
- errorMessage += ". Failed command: " + queryStr;
- }
-
- CONSOLE.printError(errorMessage, "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(error.getErrorCode(), errorMessage,
error.getSQLState(), e);
- } finally {
- // Trigger post compilation hook. Note that if the compilation fails
here then
- // before/after execution hook will never be executed.
- if (!parseError) {
- try {
- driverContext.getHookRunner().runAfterCompilationHook(command,
compileError);
- } catch (Exception e) {
- LOG.warn("Failed when invoking query after-compilation hook.", e);
- }
- }
-
- double duration = perfLogger.PerfLogEnd(CLASS_NAME,
PerfLogger.COMPILE)/1000.00;
- ImmutableMap<String, Long> compileHMSTimings =
dumpMetaCallTimingWithoutEx("compilation");
-
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION,
compileHMSTimings);
-
- boolean isInterrupted = driverState.isAborted();
- if (isInterrupted && !deferClose) {
- closeInProcess(true);
- }
-
- if (isInterrupted) {
- driverState.compilationInterruptedWithLocking(deferClose);
- LOG.info("Compiling command(queryId=" + queryId + ") has been
interrupted after " + duration + " seconds");
- } else {
- driverState.compilationFinishedWithLocking(compileError);
- LOG.info("Completed compiling command(queryId=" + queryId + "); Time
taken: " + duration + " seconds");
- }
- }
}
private void createTransactionManager() throws CommandProcessorException {
@@ -582,8 +278,55 @@ public class Driver implements IDriver {
String errorMessage = "FAILED: " + e.getClass().getSimpleName() + "
[Error " + error.getErrorCode() + "]:";
CONSOLE.printError(errorMessage, "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(error.getErrorCode(), errorMessage,
error.getSQLState(), e);
+ throw DriverUtils.createProcessorException(driverContext,
error.getErrorCode(), errorMessage, error.getSQLState(),
+ e);
+ }
+ }
+
+ private void prepareContext() throws CommandProcessorException {
+ if (context != null && context.getExplainAnalyze() !=
AnalyzeState.RUNNING) {
+ // close the existing ctx etc before compiling a new query, but does not
destroy driver
+ closeInProcess(false);
+ }
+
+ try {
+ if (context == null) {
+ context = new Context(driverContext.getConf());
+ }
+ } catch (IOException e) {
+ throw new CommandProcessorException(e);
+ }
+
+ context.setHiveTxnManager(driverContext.getTxnManager());
+ context.setStatsSource(driverContext.getStatsSource());
+ context.setHDFSCleanup(true);
+ }
+
+ private void setQueryId() {
+ String queryId =
Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ?
+ QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId();
+
+ SparkSession ss = SessionState.get().getSparkSession();
+ if (ss != null) {
+ ss.onQuerySubmission(queryId);
}
+ driverContext.getQueryDisplay().setQueryId(queryId);
+
+ setTriggerContext(queryId);
+ }
+
+ private void setTriggerContext(String queryId) {
+ long queryStartTime;
+ // query info is created by SQLOperation which will have start time of the
operation. When JDBC Statement is not
+ // used queryInfo will be null, in which case we take creation of Driver
instance as query start time (which is also
+ // the time when query display object is created)
+ if (driverContext.getQueryInfo() != null) {
+ queryStartTime = driverContext.getQueryInfo().getBeginTime();
+ } else {
+ queryStartTime = driverContext.getQueryDisplay().getQueryStartTime();
+ }
+ WmContext wmContext = new WmContext(queryStartTime, queryId);
+ context.setWmContext(wmContext);
}
// Checks whether txn list has been invalidated while planning the query.
@@ -606,12 +349,12 @@ public class Driver implements IDriver {
// 2) Get locks that are relevant:
// - Exclusive for INSERT OVERWRITE.
// - Semi-shared for UPDATE/DELETE.
- if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
+ if (context.getHiveLocks() == null || context.getHiveLocks().isEmpty()) {
// Nothing to check
return true;
}
Set<String> nonSharedLocks = new HashSet<>();
- for (HiveLock lock : ctx.getHiveLocks()) {
+ for (HiveLock lock : context.getHiveLocks()) {
if (lock.mayContainComponents()) {
// The lock may have multiple components, e.g., DbHiveLock, hence we
need
// to check for each of them
@@ -684,163 +427,6 @@ public class Driver implements IDriver {
return true;
}
- private void setTriggerContext(final String queryId) {
- final long queryStartTime;
- // query info is created by SQLOperation which will have start time of the
operation. When JDBC Statement is not
- // used queryInfo will be null, in which case we take creation of Driver
instance as query start time (which is also
- // the time when query display object is created)
- if (driverContext.getQueryInfo() != null) {
- queryStartTime = driverContext.getQueryInfo().getBeginTime();
- } else {
- queryStartTime = driverContext.getQueryDisplay().getQueryStartTime();
- }
- WmContext wmContext = new WmContext(queryStartTime, queryId);
- ctx.setWmContext(wmContext);
- }
-
- /**
- * Last repl id should be captured before opening txn by current REPL DUMP
operation.
- * This is needed to avoid losing data which are added/modified by
concurrent txns when bootstrap
- * dump in progress.
- * @param conf Query configurations
- * @throws HiveException
- * @throws TException
- */
- private void setLastReplIdForDump(HiveConf conf) throws HiveException,
TException {
- // Last logged notification event id would be the last repl Id for the
current REPl DUMP.
- Hive hiveDb = Hive.get();
- Long lastReplId =
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
- conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId);
- LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
- }
-
- private void openTransaction(TxnType txnType) throws LockException,
CommandProcessorException {
- if (checkConcurrency() && startImplicitTxn(driverContext.getTxnManager())
&&
- !driverContext.getTxnManager().isTxnOpen()) {
- String userFromUGI = getUserFromUGI();
- driverContext.getTxnManager().openTxn(ctx, userFromUGI, txnType);
- }
- }
-
- private void generateValidTxnList() throws LockException {
- // Record current valid txn list that will be used throughout the query
- // compilation and processing. We only do this if 1) a transaction
- // was already opened and 2) the list has not been recorded yet,
- // e.g., by an explicit open transaction command.
- driverContext.setValidTxnListsGenerated(false);
- String currentTxnString =
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
- if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null
|| currentTxnString.isEmpty())) {
- try {
- recordValidTxns(driverContext.getTxnManager());
- driverContext.setValidTxnListsGenerated(true);
- } catch (LockException e) {
- LOG.error("Exception while acquiring valid txn list", e);
- throw e;
- }
- }
- }
-
- private boolean startImplicitTxn(HiveTxnManager txnManager) throws
LockException {
- boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
- //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
- HiveOperation hiveOperation =
driverContext.getQueryState().getHiveOperation();
- switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) {
- case COMMIT:
- case ROLLBACK:
- if (!txnManager.isTxnOpen()) {
- throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN,
hiveOperation.getOperationName());
- }
- case SWITCHDATABASE:
- case SET_AUTOCOMMIT:
- /**
- * autocommit is here for completeness. TM doesn't use it. If we want
to support JDBC
- * semantics (or any other definition of autocommit) it should be done
at session level.
- */
- case SHOWDATABASES:
- case SHOWTABLES:
- case SHOWCOLUMNS:
- case SHOWFUNCTIONS:
- case SHOWPARTITIONS:
- case SHOWLOCKS:
- case SHOWVIEWS:
- case SHOW_ROLES:
- case SHOW_ROLE_PRINCIPALS:
- case SHOW_COMPACTIONS:
- case SHOW_TRANSACTIONS:
- case ABORT_TRANSACTIONS:
- case KILL_QUERY:
- shouldOpenImplicitTxn = false;
- //this implies that no locks are needed for such a command
- }
- return shouldOpenImplicitTxn;
- }
-
- private void checkInterrupted(String msg, HookContext hookContext,
PerfLogger perfLogger)
- throws CommandProcessorException {
- if (driverState.isAborted()) {
- String errorMessage = "FAILED: command has been interrupted: " + msg;
- CONSOLE.printError(errorMessage);
- if (hookContext != null) {
- try {
- invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
- } catch (Exception e) {
- LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
- }
- }
- throw createProcessorException(1000, errorMessage, "HY008", null);
- }
- }
-
- private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase)
{
- try {
- return Hive.get().dumpAndClearMetaCallTiming(phase);
- } catch (HiveException he) {
- LOG.warn("Caught exception attempting to write metadata call information
" + he, he);
- }
- return null;
- }
-
- /**
- * Returns EXPLAIN EXTENDED output for a semantically
- * analyzed query.
- *
- * @param sem semantic analyzer for analyzed query
- * @param plan query plan
- * @param astTree AST tree dump
- * @throws java.io.IOException
- */
- private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan,
- ASTNode astTree) throws IOException {
- String ret = null;
- ExplainTask task = new ExplainTask();
- task.initialize(driverContext.getQueryState(), plan, null, ctx);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
- try {
- List<Task<?>> rootTasks = sem.getAllRootTasks();
- if
(driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) {
- JSONObject jsonPlan = task.getJSONPlan(
- null, rootTasks, sem.getFetchTask(), true, true, true,
sem.getCboInfo(),
- plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
- if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null &&
- jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <=
-
driverContext.getConf().getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) {
- ret = jsonPlan.toString();
- } else {
- ret = null;
- }
- } else {
- task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true,
sem.getCboInfo(),
- plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
- ret = baos.toString();
- }
- } catch (Exception e) {
- LOG.warn("Exception generating explain output: " + e, e);
- }
-
- return ret;
- }
-
@Override
public HiveConf getConf() {
return driverContext.getConf();
@@ -862,19 +448,6 @@ public class Driver implements IDriver {
return driverContext.getFetchTask();
}
- // Write the current set of valid transactions into the conf file
- private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
- String oldTxnString =
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
- if ((oldTxnString != null) && (oldTxnString.length() > 0)) {
- throw new IllegalStateException("calling recordValidTxn() more than once
in the same " +
- JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
- }
- ValidTxnList txnList = txnMgr.getValidTxns();
- String txnStr = txnList.toString();
- driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr);
- LOG.debug("Encoding valid txns info " + txnStr + " txnid:" +
txnMgr.getCurrentTxnId());
- }
-
// Write the current set of valid write ids for the operated acid tables
into the conf file so
// that it can be read by the input format.
private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr)
throws LockException {
@@ -979,18 +552,6 @@ public class Driver implements IDriver {
tables.put(fullTableName, tbl);
}
- private String getUserFromUGI() throws CommandProcessorException {
- // Don't use the userName member, as it may or may not have been set. Get
the value from
- // conf, which calls into getUGI to figure out who the process is running
as.
- try {
- return driverContext.getConf().getUser();
- } catch (IOException e) {
- String errorMessage = "FAILED: Error in determining user while acquiring
locks: " + e.getMessage();
- CONSOLE.printError(errorMessage, "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(10, errorMessage,
ErrorMsg.findSQLState(e.getMessage()), e);
- }
- }
-
/**
* Acquire read and write locks needed by the statement. The list of objects
to be locked are
* obtained from the inputs and outputs populated by the compiler. Locking
strategy depends on
@@ -1012,7 +573,7 @@ public class Driver implements IDriver {
return;
}
try {
- String userFromUGI = getUserFromUGI();
+ String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
// Set the table write id in all of the acid file sinks
if (!driverContext.getPlan().getAcidSinks().isEmpty()) {
@@ -1061,8 +622,8 @@ public class Driver implements IDriver {
/*It's imperative that {@code acquireLocks()} is called for all commands
so that
HiveTxnManager can transition its state machine correctly*/
- driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), ctx,
userFromUGI, driverState);
- final List<HiveLock> locks = ctx.getHiveLocks();
+ driverContext.getTxnManager().acquireLocks(driverContext.getPlan(),
context, userFromUGI, driverState);
+ final List<HiveLock> locks = context.getHiveLocks();
LOG.info("Operation {} obtained {} locks",
driverContext.getPlan().getOperation(),
((locks == null) ? 0 : locks.size()));
// This check is for controlling the correctness of the current state
@@ -1081,7 +642,8 @@ public class Driver implements IDriver {
} catch (Exception e) {
String errorMessage = "FAILED: Error in acquiring locks: " +
e.getMessage();
CONSOLE.printError(errorMessage, "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(10, errorMessage,
ErrorMsg.findSQLState(e.getMessage()), e);
+ throw DriverUtils.createProcessorException(driverContext, 10,
errorMessage, ErrorMsg.findSQLState(e.getMessage()),
+ e);
} finally {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
}
@@ -1112,7 +674,7 @@ public class Driver implements IDriver {
// releasing the locks.
driverContext.getConf().unset(ValidTxnList.VALID_TXNS_KEY);
driverContext.getConf().unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
- if(!checkConcurrency()) {
+ if (!DriverUtils.checkConcurrency(driverContext)) {
return;
}
if (txnMgr.isTxnOpen()) {
@@ -1129,14 +691,14 @@ public class Driver implements IDriver {
}
} else {
//since there is no tx, we only have locks for current query (if any)
- if (ctx != null && ctx.getHiveLocks() != null) {
- hiveLocks.addAll(ctx.getHiveLocks());
+ if (context != null && context.getHiveLocks() != null) {
+ hiveLocks.addAll(context.getHiveLocks());
}
txnMgr.releaseLocks(hiveLocks);
}
hiveLocks.clear();
- if (ctx != null) {
- ctx.setHiveLocks(null);
+ if (context != null) {
+ context.setHiveLocks(null);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
@@ -1269,7 +831,8 @@ public class Driver implements IDriver {
}
if (!success) {
String errorMessage =
ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg();
- throw
createProcessorException(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(),
errorMessage, null, null);
+ throw DriverUtils.createProcessorException(driverContext,
ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(),
+ errorMessage, null, null);
}
try {
@@ -1301,7 +864,7 @@ public class Driver implements IDriver {
} else {
String errorMessage = "FAILED: Precompiled query has been cancelled
or closed.";
CONSOLE.printError(errorMessage);
- throw createProcessorException(12, errorMessage, null, null);
+ throw DriverUtils.createProcessorException(driverContext, 12,
errorMessage, null, null);
}
} else {
driverState.compiling();
@@ -1315,14 +878,15 @@ public class Driver implements IDriver {
boolean isFinishedWithError = true;
try {
HiveDriverRunHookContext hookContext = new
HiveDriverRunHookContextImpl(driverContext.getConf(),
- alreadyCompiled ? ctx.getCmd() : command);
+ alreadyCompiled ? context.getCmd() : command);
// Get all the driver run hooks and pre-execute them.
try {
driverContext.getHookRunner().runPreDriverHooks(hookContext);
} catch (Exception e) {
String errorMessage = "FAILED: Hive Internal Error: " +
Utilities.getNameMessage(e);
CONSOLE.printError(errorMessage + "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(12, errorMessage,
ErrorMsg.findSQLState(e.getMessage()), e);
+ throw DriverUtils.createProcessorException(driverContext, 12,
errorMessage,
+ ErrorMsg.findSQLState(e.getMessage()), e);
}
if (!alreadyCompiled) {
@@ -1340,9 +904,9 @@ public class Driver implements IDriver {
// the reason that we set the txn manager for the cxt here is because
each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
- ctx.setHiveTxnManager(driverContext.getTxnManager());
+ context.setHiveTxnManager(driverContext.getTxnManager());
- checkInterrupted("at acquiring the lock.", null, null);
+ DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring
the lock.", null, null);
lockAndRespond();
@@ -1356,9 +920,9 @@ public class Driver implements IDriver {
// and then, we acquire locks. If snapshot is still valid, we
continue as usual.
// But if snapshot is not valid, we recompile the query.
driverContext.setRetrial(true);
- driverContext.getBackupContext().addRewrittenStatementContext(ctx);
- driverContext.getBackupContext().setHiveLocks(ctx.getHiveLocks());
- ctx = driverContext.getBackupContext();
+
driverContext.getBackupContext().addRewrittenStatementContext(context);
+
driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
+ context = driverContext.getBackupContext();
driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
driverContext.getTxnManager().getValidTxns().toString());
if (driverContext.getPlan().hasAcidResourcesInQuery()) {
@@ -1384,7 +948,7 @@ public class Driver implements IDriver {
// the reason that we set the txn manager for the cxt here is
because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
- ctx.setHiveTxnManager(driverContext.getTxnManager());
+ context.setHiveTxnManager(driverContext.getTxnManager());
}
} catch (LockException e) {
throw handleHiveException(e, 13);
@@ -1427,7 +991,8 @@ public class Driver implements IDriver {
} catch (Exception e) {
String errorMessage = "FAILED: Hive Internal Error: " +
Utilities.getNameMessage(e);
CONSOLE.printError(errorMessage + "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(12, errorMessage,
ErrorMsg.findSQLState(e.getMessage()), e);
+ throw DriverUtils.createProcessorException(driverContext, 12,
errorMessage,
+ ErrorMsg.findSQLState(e.getMessage()), e);
}
isFinishedWithError = false;
} finally {
@@ -1473,11 +1038,11 @@ public class Driver implements IDriver {
String sqlState = e.getCanonicalErrorMsg() != null ?
e.getCanonicalErrorMsg().getSQLState() :
ErrorMsg.findSQLState(e.getMessage());
CONSOLE.printError(errorMessage + "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(ret, errorMessage, sqlState, e);
+ throw DriverUtils.createProcessorException(driverContext, ret,
errorMessage, sqlState, e);
}
private boolean requiresLock() {
- if (!checkConcurrency()) {
+ if (!DriverUtils.checkConcurrency(driverContext)) {
LOG.info("Concurrency mode is disabled, not creating a lock manager");
return false;
}
@@ -1526,23 +1091,10 @@ public class Driver implements IDriver {
return false;
}
- private CommandProcessorException createProcessorException(int ret, String
errorMessage, String sqlState,
- Throwable downstreamError) {
- SessionState.getPerfLogger().cleanupPerfLogMetrics();
- driverContext.getQueryDisplay().setErrorMessage(errorMessage);
- if (downstreamError != null && downstreamError instanceof HiveException) {
- ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
- if (em != null) {
- return new CommandProcessorException(ret, em.getErrorCode(),
errorMessage, sqlState, downstreamError);
- }
- }
- return new CommandProcessorException(ret, -1, errorMessage, sqlState,
downstreamError);
- }
-
private void useFetchFromCache(CacheEntry cacheEntry) {
// Change query FetchTask to use new location specified in results cache.
FetchTask fetchTaskFromCache = (FetchTask)
TaskFactory.get(cacheEntry.getFetchWork());
- fetchTaskFromCache.initialize(driverContext.getQueryState(),
driverContext.getPlan(), null, ctx);
+ fetchTaskFromCache.initialize(driverContext.getQueryState(),
driverContext.getPlan(), null, context);
driverContext.getPlan().setFetchTask(fetchTaskFromCache);
driverContext.setCacheUsage(new
CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
}
@@ -1629,7 +1181,7 @@ public class Driver implements IDriver {
if (!driverState.isCompiled() && !driverState.isExecuting()) {
String errorMessage = "FAILED: unexpected driverstate: " + driverState
+ ", for query " + queryStr;
CONSOLE.printError(errorMessage);
- throw createProcessorException(1000, errorMessage, "HY008", null);
+ throw DriverUtils.createProcessorException(driverContext, 1000,
errorMessage, "HY008", null);
} else {
driverState.executing();
}
@@ -1659,11 +1211,10 @@ public class Driver implements IDriver {
SessionState ss = SessionState.get();
// TODO: should this use getUserFromAuthenticator?
- hookContext = new PrivateHookContext(driverContext.getPlan(),
driverContext.getQueryState(), ctx.getPathToCS(),
- SessionState.get().getUserName(),
- ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(),
operationId,
- ss.getSessionId(), Thread.currentThread().getName(),
ss.isHiveServerQuery(), perfLogger,
- driverContext.getQueryInfo(), ctx);
+ hookContext = new PrivateHookContext(driverContext.getPlan(),
driverContext.getQueryState(),
+ context.getPathToCS(), SessionState.get().getUserName(),
ss.getUserIpAddress(),
+ InetAddress.getLocalHost().getHostAddress(), operationId,
ss.getSessionId(), Thread.currentThread().getName(),
+ ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(),
context);
hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
driverContext.getHookRunner().runPreHooks(hookContext);
@@ -1693,12 +1244,12 @@ public class Driver implements IDriver {
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.
- checkInterrupted("before running tasks.", hookContext, perfLogger);
+ DriverUtils.checkInterrupted(driverState, driverContext, "before running
tasks.", hookContext, perfLogger);
- taskQueue = new TaskQueue(ctx); // for canceling the query (should be
bound to session?)
+ taskQueue = new TaskQueue(context); // for canceling the query (should
be bound to session?)
taskQueue.prepare(driverContext.getPlan());
- ctx.setHDFSCleanup(true);
+ context.setHDFSCleanup(true);
SessionState.get().setMapRedStats(new LinkedHashMap<>());
SessionState.get().setStackTraces(new HashMap<>());
@@ -1755,7 +1306,8 @@ public class Driver implements IDriver {
TaskResult result = tskRun.getTaskResult();
int exitVal = result.getExitVal();
- checkInterrupted("when checking the execution result.", hookContext,
perfLogger);
+ DriverUtils.checkInterrupted(driverState, driverContext, "when
checking the execution result.", hookContext,
+ perfLogger);
if (exitVal != 0) {
Task<?> backupTask = tsk.getAndInitBackupTask();
@@ -1776,7 +1328,7 @@ public class Driver implements IDriver {
if (taskQueue.isShutdown()) {
errorMessage = "FAILED: Operation cancelled. " + errorMessage;
}
- invokeFailureHooks(perfLogger, hookContext,
+ DriverUtils.invokeFailureHooks(driverContext, perfLogger,
hookContext,
errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()),
result.getTaskError());
String sqlState = "08S01";
@@ -1794,8 +1346,9 @@ public class Driver implements IDriver {
taskQueue.shutdown();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
- ctx.restoreOriginalTracker();
- throw createProcessorException(exitVal, errorMessage, sqlState,
result.getTaskError());
+ context.restoreOriginalTracker();
+ throw DriverUtils.createProcessorException(driverContext, exitVal,
errorMessage, sqlState,
+ result.getTaskError());
}
}
@@ -1821,13 +1374,13 @@ public class Driver implements IDriver {
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
- ctx.restoreOriginalTracker();
+ context.restoreOriginalTracker();
if (taskQueue.isShutdown()) {
String errorMessage = "FAILED: Operation cancelled";
- invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
+ DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext,
errorMessage, null);
CONSOLE.printError(errorMessage);
- throw createProcessorException(1000, errorMessage, "HY008", null);
+ throw DriverUtils.createProcessorException(driverContext, 1000,
errorMessage, "HY008", null);
}
// remove incomplete outputs.
@@ -1861,9 +1414,10 @@ public class Driver implements IDriver {
} catch (Throwable e) {
executionError = true;
- checkInterrupted("during query execution: \n" + e.getMessage(),
hookContext, perfLogger);
+ DriverUtils.checkInterrupted(driverState, driverContext, "during query
execution: \n" + e.getMessage(),
+ hookContext, perfLogger);
- ctx.restoreOriginalTracker();
+ context.restoreOriginalTracker();
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId,
Keys.QUERY_RET_CODE,
String.valueOf(12));
@@ -1872,13 +1426,13 @@ public class Driver implements IDriver {
String errorMessage = "FAILED: Hive Internal Error: " +
Utilities.getNameMessage(e);
if (hookContext != null) {
try {
- invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
+ DriverUtils.invokeFailureHooks(driverContext, perfLogger,
hookContext, errorMessage, e);
} catch (Exception t) {
LOG.warn("Failed to invoke failure hook", t);
}
}
CONSOLE.printError(errorMessage + "\n" +
StringUtils.stringifyException(e));
- throw createProcessorException(12, errorMessage, "08S01", e);
+ throw DriverUtils.createProcessorException(driverContext, 12,
errorMessage, "08S01", e);
} finally {
// Trigger query hooks after query completes its execution.
try {
@@ -1895,7 +1449,7 @@ public class Driver implements IDriver {
}
double duration = perfLogger.PerfLogEnd(CLASS_NAME,
PerfLogger.DRIVER_EXECUTE)/1000.00;
- ImmutableMap<String, Long> executionHMSTimings =
dumpMetaCallTimingWithoutEx("execution");
+ ImmutableMap<String, Long> executionHMSTimings =
Hive.dumpMetaCallTimingWithoutEx("execution");
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION,
executionHMSTimings);
Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
@@ -2016,15 +1570,6 @@ public class Driver implements IDriver {
return errorMessage;
}
- private void invokeFailureHooks(PerfLogger perfLogger,
- HookContext hookContext, String errorMessage, Throwable exception)
throws Exception {
- hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
- hookContext.setErrorMessage(errorMessage);
- hookContext.setException(exception);
- // Get all the failure execution hooks and execute them.
- driverContext.getHookRunner().runFailureHooks(hookContext);
- }
-
/**
* Launches a new task
*
@@ -2055,7 +1600,7 @@ public class Driver implements IDriver {
taskQueue.incCurJobNo(1);
CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of
" + jobs);
}
- tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(),
taskQueue, ctx);
+ tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(),
taskQueue, context);
TaskRunner tskRun = new TaskRunner(tsk, taskQueue);
taskQueue.launching(tskRun);
@@ -2101,7 +1646,7 @@ public class Driver implements IDriver {
}
if (resStream == null) {
- resStream = ctx.getStream();
+ resStream = context.getStream();
}
if (resStream == null) {
return false;
@@ -2140,7 +1685,7 @@ public class Driver implements IDriver {
}
if (ss == Utilities.StreamStatus.EOF) {
- resStream = ctx.getStream();
+ resStream = context.getStream();
}
}
return true;
@@ -2158,9 +1703,9 @@ public class Driver implements IDriver {
throw new IOException("Error closing the current fetch task", e);
}
// FetchTask should not depend on the plan.
- driverContext.getFetchTask().initialize(driverContext.getQueryState(),
null, null, ctx);
+ driverContext.getFetchTask().initialize(driverContext.getQueryState(),
null, null, context);
} else {
- ctx.resetStream();
+ context.resetStream();
resStream = null;
}
}
@@ -2199,7 +1744,7 @@ public class Driver implements IDriver {
private void releaseContext() {
try {
- if (ctx != null) {
+ if (context != null) {
boolean deleteResultDir = true;
// don't let context delete result dirs and scratch dirs if result was
cached
if (driverContext.getCacheUsage() != null
@@ -2207,12 +1752,12 @@ public class Driver implements IDriver {
deleteResultDir = false;
}
- ctx.clear(deleteResultDir);
- if (ctx.getHiveLocks() != null) {
- hiveLocks.addAll(ctx.getHiveLocks());
- ctx.setHiveLocks(null);
+ context.clear(deleteResultDir);
+ if (context.getHiveLocks() != null) {
+ hiveLocks.addAll(context.getHiveLocks());
+ context.setHiveLocks(null);
}
- ctx = null;
+ context = null;
}
} catch (Exception e) {
LOG.debug("Exception while clearing the context ", e);
@@ -2271,14 +1816,14 @@ public class Driver implements IDriver {
// Close and release resources within a running query process. Since it runs
under
// driver state COMPILING, EXECUTING or INTERRUPT, it would not have race
condition
// with the releases probably running in the other closing thread.
- private int closeInProcess(boolean destroyed) {
+ public int closeInProcess(boolean destroyed) {
releaseTaskQueue();
releasePlan();
releaseCachedResult();
releaseFetchTask();
releaseResStream();
releaseContext();
- if(destroyed) {
+ if (destroyed) {
if (!hiveLocks.isEmpty()) {
try {
releaseLocksAndCommitOrRollback(false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 13e2f29..bbf7fe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -84,6 +84,14 @@ public class DriverContext {
return queryDisplay;
}
+ public String getQueryId() {
+ return queryDisplay.getQueryId();
+ }
+
+ public String getQueryString() {
+ return queryDisplay.getQueryString();
+ }
+
public QueryState getQueryState() {
return queryState;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
index aa8e64d..1eacf69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -15,19 +15,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql;
+import java.io.IOException;
+
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DriverUtils {
- private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
+/**
+ * Utility functions for the Driver.
+ */
+public final class DriverUtils {
+ private static final String CLASS_NAME = Driver.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+ private DriverUtils() {
+ throw new UnsupportedOperationException("DriverUtils should not be
instantiated!");
+ }
public static void runOnDriver(HiveConf conf, String user, SessionState
sessionState,
String query) throws HiveException {
@@ -89,4 +105,58 @@ public class DriverUtils {
}
return sessionState;
}
+
+ public static void checkInterrupted(DriverState driverState, DriverContext
driverContext, String msg,
+ HookContext hookContext, PerfLogger perfLogger) throws
CommandProcessorException {
+ if (driverState.isAborted()) {
+ String errorMessage = "FAILED: command has been interrupted: " + msg;
+ CONSOLE.printError(errorMessage);
+ if (hookContext != null) {
+ try {
+ invokeFailureHooks(driverContext, perfLogger, hookContext,
errorMessage, null);
+ } catch (Exception e) {
+ LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
+ }
+ }
+ throw createProcessorException(driverContext, 1000, errorMessage,
"HY008", null);
+ }
+ }
+
+ public static void invokeFailureHooks(DriverContext driverContext,
PerfLogger perfLogger, HookContext hookContext,
+ String errorMessage, Throwable exception) throws Exception {
+ hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
+ hookContext.setErrorMessage(errorMessage);
+ hookContext.setException(exception);
+ // Get all the failure execution hooks and execute them.
+ driverContext.getHookRunner().runFailureHooks(hookContext);
+ }
+
+ public static CommandProcessorException
createProcessorException(DriverContext driverContext, int ret,
+ String errorMessage, String sqlState, Throwable downstreamError) {
+ SessionState.getPerfLogger().cleanupPerfLogMetrics();
+ driverContext.getQueryDisplay().setErrorMessage(errorMessage);
+ if (downstreamError != null && downstreamError instanceof HiveException) {
+ ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+ if (em != null) {
+ return new CommandProcessorException(ret, em.getErrorCode(),
errorMessage, sqlState, downstreamError);
+ }
+ }
+ return new CommandProcessorException(ret, -1, errorMessage, sqlState,
downstreamError);
+ }
+
+ public static boolean checkConcurrency(DriverContext driverContext) {
+ return
driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ }
+
+ public static String getUserFromUGI(DriverContext driverContext) throws
CommandProcessorException {
+ // Don't use the userName member, as it may or may not have been set. Get
the value from
+ // conf, which calls into getUGI to figure out who the process is running
as.
+ try {
+ return driverContext.getConf().getUser();
+ } catch (IOException e) {
+ String errorMessage = "FAILED: Error in determining user while acquiring
locks: " + e.getMessage();
+ CONSOLE.printError(errorMessage, "\n" +
StringUtils.stringifyException(e));
+ throw createProcessorException(driverContext, 10, errorMessage,
ErrorMsg.findSQLState(e.getMessage()), e);
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 4d79ebc..c1f94d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
@@ -46,10 +48,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import
org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -81,17 +87,13 @@ import com.google.common.annotations.VisibleForTesting;
*
**/
public class ExplainTask extends Task<ExplainWork> implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExplainTask.class.getName());
+
public static final String STAGE_DEPENDENCIES = "STAGE DEPENDENCIES";
private static final long serialVersionUID = 1L;
public static final String EXPL_COLUMN_NAME = "Explain";
private final Set<Operator<?>> visitedOps = new HashSet<Operator<?>>();
private boolean isLogical = false;
- protected final Logger LOG;
-
- public ExplainTask() {
- super();
- LOG = LoggerFactory.getLogger(this.getClass().getName());
- }
/*
* Below method returns the dependencies for the passed in query to EXPLAIN.
@@ -1292,4 +1294,44 @@ public class ExplainTask extends Task<ExplainWork>
implements Serializable {
public boolean canExecuteInParallel() {
return false;
}
+
+
+ /**
+ * Returns EXPLAIN EXTENDED output for a semantically analyzed query.
+ *
+ * @param sem semantic analyzer for analyzed query
+ * @param plan query plan
+ * @param astTree AST tree dump
+ */
+ public static String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan
plan, ASTNode astTree,
+ QueryState queryState, Context context, HiveConf conf) throws
IOException {
+ String ret = null;
+ ExplainTask task = new ExplainTask();
+ task.initialize(queryState, plan, null, context);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ try {
+ List<Task<?>> rootTasks = sem.getAllRootTasks();
+ if (conf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) {
+ JSONObject jsonPlan = task.getJSONPlan(
+ null, rootTasks, sem.getFetchTask(), true, true, true,
sem.getCboInfo(),
+ plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
+ if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null &&
+ jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <=
+ conf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) {
+ ret = jsonPlan.toString();
+ } else {
+ ret = null;
+ }
+ } else {
+ task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true,
sem.getCboInfo(),
+ plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
+ ret = baos.toString();
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception generating explain output: " + e, e);
+ }
+
+ return ret;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index fa6c9d0..f4bd0f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -5389,6 +5389,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
metaCallTimeMap.clear();
}
+ public static ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String
phase) {
+ try {
+ return get().dumpAndClearMetaCallTiming(phase);
+ } catch (HiveException he) {
+ LOG.warn("Caught exception attempting to write metadata call information
" + he, he);
+ }
+ return null;
+ }
+
public ImmutableMap<String, Long> dumpAndClearMetaCallTiming(String phase) {
boolean phaseInfoLogged = false;
if (LOG.isDebugEnabled()) {