This is an automated email from the ASF dual-hosted git repository.

mgergely pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 23db35e  HIVE-22526 Extract Compiler from Driver (Miklos Gergely, 
reviewed by Zoltan Haindrich)
23db35e is described below

commit 23db35e092ce1d09c5993b45c8b0f790505fc1a5
Author: miklosgergely <[email protected]>
AuthorDate: Thu Nov 21 13:38:46 2019 +0100

    HIVE-22526 Extract Compiler from Driver (Miklos Gergely, reviewed by Zoltan 
Haindrich)
---
 .../java/org/apache/hadoop/hive/ql/Compiler.java   | 483 ++++++++++++++
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  | 721 ++++-----------------
 .../org/apache/hadoop/hive/ql/DriverContext.java   |   8 +
 .../org/apache/hadoop/hive/ql/DriverUtils.java     |  74 ++-
 .../apache/hadoop/hive/ql/exec/ExplainTask.java    |  54 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |   9 +
 6 files changed, 753 insertions(+), 596 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
new file mode 100644
index 0000000..a559d90
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveVariableSource;
+import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import 
org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * The compiler compiles the command, by creating a QueryPlan from a String 
command.
+ * Also opens a transaction if necessary.
+ */
+public class Compiler {
+  private static final String CLASS_NAME = Driver.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+  private final Context context;
+  private final DriverContext driverContext;
+  private final DriverState driverState;
+
+  private ASTNode tree;
+
+  public Compiler(Context context, DriverContext driverContext, DriverState 
driverState) {
+    this.context = context;
+    this.driverContext = driverContext;
+    this.driverState = driverState;
+  }
+
+  /**
+   * @param deferClose indicates if the close/destroy should be deferred when 
the process has been interrupted
+   *             it should be set to true if the compile is called within 
another method like runInternal,
+   *             which defers the close to the called in that method.
+   */
+  public QueryPlan compile(String rawCommand, boolean deferClose) throws 
CommandProcessorException {
+    initialize(rawCommand);
+
+    boolean compileError = false;
+    boolean parsed = false;
+    QueryPlan plan = null;
+    try {
+      DriverUtils.checkInterrupted(driverState, driverContext, "before parsing 
and analysing the query", null, null);
+
+      parse();
+      parsed = true;
+      BaseSemanticAnalyzer sem = analyze();
+
+      DriverUtils.checkInterrupted(driverState, driverContext, "after 
analyzing query.", null, null);
+
+      plan = createPlan(sem);
+      authorize(sem);
+      explainOutput(sem, plan);
+    } catch (CommandProcessorException cpe) {
+      compileError = true;
+      throw cpe;
+    } catch (Exception e) {
+      compileError = true;
+      DriverUtils.checkInterrupted(driverState, driverContext, "during query 
compilation: " + e.getMessage(), null,
+          null);
+      handleException(e);
+    } finally {
+      cleanUp(compileError, parsed, deferClose);
+    }
+
+    return plan;
+  }
+
+  private void initialize(String rawCommand) throws CommandProcessorException {
+    SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
+    driverState.compilingWithLocking();
+
+    VariableSubstitution variableSubstitution = new VariableSubstitution(new 
HiveVariableSource() {
+      @Override
+      public Map<String, String> getHiveVariable() {
+        return SessionState.get().getHiveVariables();
+      }
+    });
+    String command = variableSubstitution.substitute(driverContext.getConf(), 
rawCommand);
+
+    String queryStr = command;
+    try {
+      // command should be redacted to avoid to logging sensitive data
+      queryStr = HookUtils.redactLogString(driverContext.getConf(), command);
+    } catch (Exception e) {
+      LOG.warn("WARNING! Query command could not be redacted." + e);
+    }
+
+    DriverUtils.checkInterrupted(driverState, driverContext, "at beginning of 
compilation.", null, null);
+
+    context.setCmd(command);
+    driverContext.getQueryDisplay().setQueryStr(queryStr);
+    LOG.info("Compiling command(queryId=" + driverContext.getQueryId() + "): " 
+ queryStr);
+
+    driverContext.getConf().setQueryString(queryStr);
+    // FIXME: side effect will leave the last query set at the session level
+    if (SessionState.get() != null) {
+      SessionState.get().getConf().setQueryString(queryStr);
+      SessionState.get().setupQueryCurrentTimestamp();
+    }
+  }
+
+  private void parse() throws ParseException {
+    SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
+
+    // Trigger query hook before compilation
+    driverContext.getHookRunner().runBeforeParseHook(context.getCmd());
+
+    boolean success = false;
+    try {
+      tree = ParseUtils.parse(context.getCmd(), context);
+      success = true;
+    } finally {
+      driverContext.getHookRunner().runAfterParseHook(context.getCmd(), 
!success);
+    }
+    SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
+  }
+
+  private BaseSemanticAnalyzer analyze() throws Exception {
+    SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
+
+    driverContext.getHookRunner().runBeforeCompileHook(context.getCmd());
+
+    // clear CurrentFunctionsInUse set, to capture new set of functions
+    // that SemanticAnalyzer finds are in use
+    SessionState.get().getCurrentFunctionsInUse().clear();
+
+    // Flush the metastore cache.  This assures that we don't pick up objects 
from a previous
+    // query running in this same thread.  This has to be done after we get 
our semantic
+    // analyzer (this is when the connection to the metastore is made) but 
before we analyze,
+    // because at that point we need access to the objects.
+    Hive.get().getMSC().flushCache();
+
+    driverContext.setBackupContext(new Context(context));
+    boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
+
+    HiveSemanticAnalyzerHookContext hookCtx = new 
HiveSemanticAnalyzerHookContextImpl();
+    if (executeHooks) {
+      hookCtx.setConf(driverContext.getConf());
+      hookCtx.setUserName(SessionState.get().getUserName());
+      hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
+      hookCtx.setCommand(context.getCmd());
+      
hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation());
+
+      tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree);
+    }
+
+    // SemanticAnalyzerFactory also sets the hive operation in query state
+    BaseSemanticAnalyzer sem = 
SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree);
+
+    if (!driverContext.isRetrial()) {
+      if ((driverContext.getQueryState().getHiveOperation() != null) &&
+          
driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP))
 {
+        setLastReplIdForDump(driverContext.getQueryState().getConf());
+      }
+      driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(), 
tree));
+      openTransaction(driverContext.getTxnType());
+
+      generateValidTxnList();
+    }
+
+    // Do semantic analysis and plan generation
+    sem.analyze(tree, context);
+
+    if (executeHooks) {
+      hookCtx.update(sem);
+      driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, 
sem.getAllRootTasks());
+    }
+
+    LOG.info("Semantic Analysis Completed (retrial = {})", 
driverContext.isRetrial());
+
+    // Retrieve information about cache usage for the query.
+    if 
(driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED))
 {
+      driverContext.setCacheUsage(sem.getCacheUsage());
+    }
+
+    // validate the plan
+    sem.validate();
+
+    SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
+
+    return sem;
+  }
+
+  /**
+   * Last repl id should be captured before opening txn by current REPL DUMP 
operation.
+   * This is needed to avoid losing data which are added/modified by 
concurrent txns when bootstrap
+   * dump in progress.
+   * @param conf Query configurations
+   * @throws HiveException
+   * @throws TException
+   */
+  private void setLastReplIdForDump(HiveConf conf) throws HiveException, 
TException {
+    // Last logged notification event id would be the last repl Id for the 
current REPl DUMP.
+    Hive hiveDb = Hive.get();
+    Long lastReplId = 
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+    conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId);
+    LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
+  }
+
+  private void openTransaction(TxnType txnType) throws LockException, 
CommandProcessorException {
+    if (DriverUtils.checkConcurrency(driverContext) && 
startImplicitTxn(driverContext.getTxnManager()) &&
+        !driverContext.getTxnManager().isTxnOpen()) {
+      String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
+      driverContext.getTxnManager().openTxn(context, userFromUGI, txnType);
+    }
+  }
+
+  private boolean startImplicitTxn(HiveTxnManager txnManager) throws 
LockException {
+    //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+    HiveOperation hiveOperation = 
driverContext.getQueryState().getHiveOperation();
+    switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) {
+    case COMMIT:
+    case ROLLBACK:
+      if (!txnManager.isTxnOpen()) {
+        throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, 
hiveOperation.getOperationName());
+      }
+    case SWITCHDATABASE:
+    case SET_AUTOCOMMIT:
+      /**
+       * autocommit is here for completeness.  TM doesn't use it.  If we want 
to support JDBC
+       * semantics (or any other definition of autocommit) it should be done 
at session level.
+       */
+    case SHOWDATABASES:
+    case SHOWTABLES:
+    case SHOWCOLUMNS:
+    case SHOWFUNCTIONS:
+    case SHOWPARTITIONS:
+    case SHOWLOCKS:
+    case SHOWVIEWS:
+    case SHOW_ROLES:
+    case SHOW_ROLE_PRINCIPALS:
+    case SHOW_COMPACTIONS:
+    case SHOW_TRANSACTIONS:
+    case ABORT_TRANSACTIONS:
+    case KILL_QUERY:
+      return false;
+      //this implies that no locks are needed for such a command
+    default:
+      return !context.isExplainPlan();
+    }
+  }
+
+  private void generateValidTxnList() throws LockException {
+    // Record current valid txn list that will be used throughout the query
+    // compilation and processing. We only do this if 1) a transaction
+    // was already opened and 2) the list has not been recorded yet,
+    // e.g., by an explicit open transaction command.
+    driverContext.setValidTxnListsGenerated(false);
+    String currentTxnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null 
|| currentTxnString.isEmpty())) {
+      try {
+        recordValidTxns(driverContext.getTxnManager());
+        driverContext.setValidTxnListsGenerated(true);
+      } catch (LockException e) {
+        LOG.error("Exception while acquiring valid txn list", e);
+        throw e;
+      }
+    }
+  }
+
+  // Write the current set of valid transactions into the conf file
+  private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
+    String oldTxnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
+    if ((oldTxnString != null) && (oldTxnString.length() > 0)) {
+      throw new IllegalStateException("calling recordValidTxn() more than once 
in the same " +
+              JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+    }
+    ValidTxnList txnList = txnMgr.getValidTxns();
+    String txnStr = txnList.toString();
+    driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr);
+    LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + 
txnMgr.getCurrentTxnId());
+  }
+
+  private QueryPlan createPlan(BaseSemanticAnalyzer sem) {
+    // get the output schema
+    setSchema(sem);
+    QueryPlan plan = new QueryPlan(driverContext.getQueryString(), sem,
+        driverContext.getQueryDisplay().getQueryStartTime(), 
driverContext.getQueryId(),
+        driverContext.getQueryState().getHiveOperation(), 
driverContext.getSchema());
+    // save the optimized plan and sql for the explain
+    plan.setOptimizedCBOPlan(context.getCalcitePlan());
+    plan.setOptimizedQueryString(context.getOptimizedSql());
+
+    driverContext.getConf().set("mapreduce.workflow.id", "hive_" + 
driverContext.getQueryId());
+    driverContext.getConf().set("mapreduce.workflow.name", 
driverContext.getQueryString());
+
+    // initialize FetchTask right here
+    if (plan.getFetchTask() != null) {
+      plan.getFetchTask().initialize(driverContext.getQueryState(), plan, 
null, context);
+    }
+
+    return plan;
+  }
+
+  /**
+   * Get a Schema with fields represented with native Hive types.
+   */
+  private void setSchema(BaseSemanticAnalyzer sem) {
+    Schema schema = new Schema();
+
+    // If we have a plan, prefer its logical result schema if it's available; 
otherwise, try digging out a fetch task;
+    // failing that, give up.
+    if (sem == null) {
+      LOG.info("No semantic analyzer, using empty schema.");
+    } else if (sem.getResultSchema() != null) {
+      List<FieldSchema> lst = sem.getResultSchema();
+      schema = new Schema(lst, null);
+    } else if (sem.getFetchTask() != null) {
+      FetchTask ft = sem.getFetchTask();
+      TableDesc td = ft.getTblDesc();
+      // partitioned tables don't have tableDesc set on the FetchTask. Instead 
they have a list of PartitionDesc
+      // objects, each with a table desc. Let's try to fetch the desc for the 
first partition and use it's deserializer.
+      if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != 
null) {
+        if (ft.getWork().getPartDesc().size() > 0) {
+          td = ft.getWork().getPartDesc().get(0).getTableDesc();
+        }
+      }
+
+      if (td == null) {
+        LOG.info("No returning schema, using empty schema");
+      } else {
+        String tableName = "result";
+        List<FieldSchema> lst = null;
+        try {
+          lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, 
td.getDeserializer(driverContext.getConf()));
+        } catch (Exception e) {
+          LOG.warn("Error getting schema: " + 
StringUtils.stringifyException(e));
+        }
+        if (lst != null) {
+          schema = new Schema(lst, null);
+        }
+      }
+    }
+
+    LOG.info("Created Hive schema: " + schema);
+    driverContext.setSchema(schema);
+  }
+
+  private void authorize(BaseSemanticAnalyzer sem) throws HiveException, 
CommandProcessorException {
+    // do the authorization check
+    if (!sem.skipAuthorization() &&
+        HiveConf.getBoolVar(driverContext.getConf(), 
HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
+
+      try {
+        SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, 
PerfLogger.DO_AUTHORIZATION);
+        // Authorization check for kill query will be in KillQueryImpl
+        // As both admin or operation owner can perform the operation.
+        // Which is not directly supported in authorizer
+        if (driverContext.getQueryState().getHiveOperation() != 
HiveOperation.KILL_QUERY) {
+          
CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(),
 sem, context.getCmd());
+        }
+      } catch (AuthorizationException authExp) {
+        CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ". 
Use SHOW GRANT to get more details.");
+        throw DriverUtils.createProcessorException(driverContext, 403, 
authExp.getMessage(), "42000", null);
+      } finally {
+        SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, 
PerfLogger.DO_AUTHORIZATION);
+      }
+    }
+  }
+
+  private void explainOutput(BaseSemanticAnalyzer sem, QueryPlan plan) throws 
IOException {
+    if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) ||
+        
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) 
{
+      String explainOutput = ExplainTask.getExplainOutput(sem, plan, tree, 
driverContext.getQueryState(),
+          context, driverContext.getConf());
+      if (explainOutput != null) {
+        if 
(driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
+          LOG.info("EXPLAIN output for queryid " + driverContext.getQueryId() 
+ " : " + explainOutput);
+        }
+        if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() &&
+            
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) 
{
+          driverContext.getQueryDisplay().setExplainPlan(explainOutput);
+        }
+      }
+    }
+  }
+
+  private void handleException(Exception e) throws CommandProcessorException {
+    ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
+    String errorMessage = "FAILED: " + e.getClass().getSimpleName();
+    if (error != ErrorMsg.GENERIC_ERROR) {
+      errorMessage += " [Error "  + error.getErrorCode()  + "]:";
+    }
+
+    // HIVE-4889
+    if ((e instanceof IllegalArgumentException) && e.getMessage() == null && 
e.getCause() != null) {
+      errorMessage += " " + e.getCause().getMessage();
+    } else {
+      errorMessage += " " + e.getMessage();
+    }
+
+    if (error == ErrorMsg.TXNMGR_NOT_ACID) {
+      errorMessage += ". Failed command: " + driverContext.getQueryString();
+    }
+
+    CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e));
+    throw DriverUtils.createProcessorException(driverContext, 
error.getErrorCode(), errorMessage, error.getSQLState(),
+        e);
+  }
+
+  private void cleanUp(boolean compileError, boolean parsed, boolean 
deferClose) {
+    // Trigger post compilation hook. Note that if the compilation fails here 
then
+    // before/after execution hook will never be executed.
+    if (parsed) {
+      try {
+        
driverContext.getHookRunner().runAfterCompilationHook(context.getCmd(), 
compileError);
+      } catch (Exception e) {
+        LOG.warn("Failed when invoking query after-compilation hook.", e);
+      }
+    }
+
+    double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, 
PerfLogger.COMPILE) / 1000.00;
+    ImmutableMap<String, Long> compileHMSTimings = 
Hive.dumpMetaCallTimingWithoutEx("compilation");
+    
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, 
compileHMSTimings);
+
+    if (driverState.isAborted()) {
+      driverState.compilationInterruptedWithLocking(deferClose);
+      LOG.info("Compiling command(queryId={}) has been interrupted after {} 
seconds", driverContext.getQueryId(),
+          duration);
+    } else {
+      driverState.compilationFinishedWithLocking(compileError);
+      LOG.info("Completed compiling command(queryId={}); Time taken: {} 
seconds", driverContext.getQueryId(),
+          duration);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a28bf16..7549144 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -18,10 +18,8 @@
 
 package org.apache.hadoop.hive.ql;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -48,10 +46,6 @@ import 
org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.conf.HiveVariableSource;
-import org.apache.hadoop.hive.conf.VariableSubstitution;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.Schema;
@@ -73,12 +67,10 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.TaskResult;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -89,22 +81,14 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
-import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
 import org.apache.hadoop.hive.ql.parse.HiveTableName;
-import org.apache.hadoop.hive.ql.parse.ParseException;
-import org.apache.hadoop.hive.ql.parse.ParseUtils;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -112,7 +96,6 @@ import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import 
org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer;
 import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -122,8 +105,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.common.util.TxnIdUtils;
-import org.apache.thrift.TException;
-import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,7 +124,7 @@ public class Driver implements IDriver {
   private ByteStream.Output bos = new ByteStream.Output();
 
   private DataInput resStream;
-  private Context ctx;
+  private Context context;
   private final DriverContext driverContext;
   private TaskQueue taskQueue;
   private final List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
@@ -153,10 +134,6 @@ public class Driver implements IDriver {
 
   private DriverState driverState = new DriverState();
 
-  private boolean checkConcurrency() {
-    return 
driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
-  }
-
   @Override
   public Schema getSchema() {
     return driverContext.getSchema();
@@ -168,61 +145,11 @@ public class Driver implements IDriver {
 
   @Override
   public Context getContext() {
-    return ctx;
+    return context;
   }
 
   public PlanMapper getPlanMapper() {
-    return ctx.getPlanMapper();
-  }
-
-  /**
-   * Get a Schema with fields represented with native Hive types
-   */
-  private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) {
-    Schema schema = null;
-
-    // If we have a plan, prefer its logical result schema if it's
-    // available; otherwise, try digging out a fetch task; failing that,
-    // give up.
-    if (sem == null) {
-      // can't get any info without a plan
-    } else if (sem.getResultSchema() != null) {
-      List<FieldSchema> lst = sem.getResultSchema();
-      schema = new Schema(lst, null);
-    } else if (sem.getFetchTask() != null) {
-      FetchTask ft = sem.getFetchTask();
-      TableDesc td = ft.getTblDesc();
-      // partitioned tables don't have tableDesc set on the FetchTask. Instead
-      // they have a list of PartitionDesc objects, each with a table desc.
-      // Let's
-      // try to fetch the desc for the first partition and use it's
-      // deserializer.
-      if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != 
null) {
-        if (ft.getWork().getPartDesc().size() > 0) {
-          td = ft.getWork().getPartDesc().get(0).getTableDesc();
-        }
-      }
-
-      if (td == null) {
-        LOG.info("No returning schema.");
-      } else {
-        String tableName = "result";
-        List<FieldSchema> lst = null;
-        try {
-          lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, 
td.getDeserializer(conf));
-        } catch (Exception e) {
-          LOG.warn("Error getting schema: " + 
StringUtils.stringifyException(e));
-        }
-        if (lst != null) {
-          schema = new Schema(lst, null);
-        }
-      }
-    }
-    if (schema == null) {
-      schema = new Schema();
-    }
-    LOG.info("Returning Hive schema: " + schema);
-    return schema;
+    return context.getPlanMapper();
   }
 
   /**
@@ -242,7 +169,7 @@ public class Driver implements IDriver {
   // or compile another query
   public Driver(HiveConf conf, Context ctx, LineageState lineageState) {
     this(getNewQueryState(conf, lineageState), null);
-    this.ctx = ctx;
+    context = ctx;
   }
 
   public Driver(QueryState queryState) {
@@ -293,261 +220,30 @@ public class Driver implements IDriver {
   // runInternal, which defers the close to the called in that method.
   @VisibleForTesting
   public void compile(String command, boolean resetTaskIds, boolean 
deferClose) throws CommandProcessorException {
-    createTransactionManager();
+    preparForCompile(resetTaskIds);
 
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
-    driverState.compilingWithLocking();
+    Compiler compiler = new Compiler(context, driverContext, driverState);
+    QueryPlan plan = compiler.compile(command, deferClose);
+    driverContext.setPlan(plan);
 
-    command = new VariableSubstitution(new HiveVariableSource() {
-      @Override
-      public Map<String, String> getHiveVariable() {
-        return SessionState.get().getHiveVariables();
-      }
-    }).substitute(driverContext.getConf(), command);
-
-    String queryStr = command;
+    compileFinished(deferClose);
+  }
 
-    try {
-      // command should be redacted to avoid to logging sensitive data
-      queryStr = HookUtils.redactLogString(driverContext.getConf(), command);
-    } catch (Exception e) {
-      LOG.warn("WARNING! Query command could not be redacted." + e);
+  private void compileFinished(boolean deferClose) {
+    if (DriverState.getDriverState().isAborted() && !deferClose) {
+      closeInProcess(true);
     }
+  }
 
-    checkInterrupted("at beginning of compilation.", null, null);
-
-    if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {
-      // close the existing ctx etc before compiling a new query, but does not 
destroy driver
-      closeInProcess(false);
-    }
+  private void preparForCompile(boolean resetTaskIds) throws 
CommandProcessorException {
+    createTransactionManager();
+    DriverState.setDriverState(driverState);
+    prepareContext();
+    setQueryId();
 
     if (resetTaskIds) {
       TaskFactory.resetId();
     }
-
-    DriverState.setDriverState(driverState);
-
-    final String queryId = 
Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ?
-        QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId();
-
-    SparkSession ss = SessionState.get().getSparkSession();
-    if (ss != null) {
-      ss.onQuerySubmission(queryId);
-    }
-
-    if (ctx != null) {
-      setTriggerContext(queryId);
-    }
-    //save some info for webUI for use after plan is freed
-    driverContext.getQueryDisplay().setQueryStr(queryStr);
-    driverContext.getQueryDisplay().setQueryId(queryId);
-
-    LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);
-
-    driverContext.getConf().setQueryString(queryStr);
-    // FIXME: sideeffect will leave the last query set at the session level
-    if (SessionState.get() != null) {
-      SessionState.get().getConf().setQueryString(queryStr);
-      SessionState.get().setupQueryCurrentTimestamp();
-    }
-
-    // Whether any error occurred during query compilation. Used for query 
lifetime hook.
-    boolean compileError = false;
-    boolean parseError = false;
-
-    try {
-      checkInterrupted("before parsing and analysing the query", null, null);
-
-      if (ctx == null) {
-        ctx = new Context(driverContext.getConf());
-        setTriggerContext(queryId);
-      }
-
-      ctx.setHiveTxnManager(driverContext.getTxnManager());
-      ctx.setStatsSource(driverContext.getStatsSource());
-      ctx.setCmd(command);
-      ctx.setHDFSCleanup(true);
-
-      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
-
-      // Trigger query hook before compilation
-      driverContext.getHookRunner().runBeforeParseHook(command);
-
-      ASTNode tree;
-      try {
-        tree = ParseUtils.parse(command, ctx);
-      } catch (ParseException e) {
-        parseError = true;
-        throw e;
-      } finally {
-        driverContext.getHookRunner().runAfterParseHook(command, parseError);
-      }
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
-
-      driverContext.getHookRunner().runBeforeCompileHook(command);
-      // clear CurrentFunctionsInUse set, to capture new set of functions
-      // that SemanticAnalyzer finds are in use
-      SessionState.get().getCurrentFunctionsInUse().clear();
-      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
-
-      // Flush the metastore cache.  This assures that we don't pick up 
objects from a previous
-      // query running in this same thread.  This has to be done after we get 
our semantic
-      // analyzer (this is when the connection to the metastore is made) but 
before we analyze,
-      // because at that point we need access to the objects.
-      Hive.get().getMSC().flushCache();
-
-      driverContext.setBackupContext(new Context(ctx));
-      boolean executeHooks = 
driverContext.getHookRunner().hasPreAnalyzeHooks();
-
-      HiveSemanticAnalyzerHookContext hookCtx = new 
HiveSemanticAnalyzerHookContextImpl();
-      if (executeHooks) {
-        hookCtx.setConf(driverContext.getConf());
-        hookCtx.setUserName(SessionState.get().getUserName());
-        hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
-        hookCtx.setCommand(command);
-        
hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation());
-
-        tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree);
-      }
-
-      // Do semantic analysis and plan generation
-      BaseSemanticAnalyzer sem = 
SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree);
-
-      if (!driverContext.isRetrial()) {
-        if ((driverContext.getQueryState().getHiveOperation() != null) &&
-            
driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP))
 {
-          setLastReplIdForDump(driverContext.getQueryState().getConf());
-        }
-        driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(), 
tree));
-        openTransaction(driverContext.getTxnType());
-
-        generateValidTxnList();
-      }
-
-      sem.analyze(tree, ctx);
-
-      if (executeHooks) {
-        hookCtx.update(sem);
-        driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, 
sem.getAllRootTasks());
-      }
-
-      LOG.info("Semantic Analysis Completed (retrial = {})", 
driverContext.isRetrial());
-
-      // Retrieve information about cache usage for the query.
-      if 
(driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED))
 {
-        driverContext.setCacheUsage(sem.getCacheUsage());
-      }
-
-      // validate the plan
-      sem.validate();
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
-
-      checkInterrupted("after analyzing query.", null, null);
-
-      // get the output schema
-      driverContext.setSchema(getSchema(sem, driverContext.getConf()));
-      QueryPlan plan = new QueryPlan(queryStr, sem, 
driverContext.getQueryDisplay().getQueryStartTime(), queryId,
-          driverContext.getQueryState().getHiveOperation(), 
driverContext.getSchema());
-      // save the optimized plan and sql for the explain
-      plan.setOptimizedCBOPlan(ctx.getCalcitePlan());
-      plan.setOptimizedQueryString(ctx.getOptimizedSql());
-      driverContext.setPlan(plan);
-
-      driverContext.getConf().set("mapreduce.workflow.id", "hive_" + queryId);
-      driverContext.getConf().set("mapreduce.workflow.name", queryStr);
-
-      // initialize FetchTask right here
-      if (plan.getFetchTask() != null) {
-        plan.getFetchTask().initialize(driverContext.getQueryState(), plan, 
null, ctx);
-      }
-
-      //do the authorization check
-      if (!sem.skipAuthorization() &&
-          HiveConf.getBoolVar(driverContext.getConf(), 
HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
-
-        try {
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
-          // Authorization check for kill query will be in KillQueryImpl
-          // As both admin or operation owner can perform the operation.
-          // Which is not directly supported in authorizer
-          if (driverContext.getQueryState().getHiveOperation() != 
HiveOperation.KILL_QUERY) {
-            
CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(),
 sem, command);
-          }
-        } catch (AuthorizationException authExp) {
-          CONSOLE.printError("Authorization failed:" + authExp.getMessage() + 
". Use SHOW GRANT to get more details.");
-          throw createProcessorException(403, authExp.getMessage(), "42000", 
null);
-        } finally {
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
-        }
-      }
-
-      if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)
-          || 
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) 
{
-        String explainOutput = getExplainOutput(sem, plan, tree);
-        if (explainOutput != null) {
-          if 
(driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {
-            LOG.info("EXPLAIN output for queryid " + queryId + " : " + 
explainOutput);
-          }
-          if (driverContext.getConf().isWebUiQueryInfoCacheEnabled()
-              && 
driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) 
{
-            driverContext.getQueryDisplay().setExplainPlan(explainOutput);
-          }
-        }
-      }
-    } catch (CommandProcessorException cpe) {
-      throw cpe;
-    } catch (Exception e) {
-      checkInterrupted("during query compilation: " + e.getMessage(), null, 
null);
-
-      compileError = true;
-      ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
-      String errorMessage = "FAILED: " + e.getClass().getSimpleName();
-      if (error != ErrorMsg.GENERIC_ERROR) {
-        errorMessage += " [Error "  + error.getErrorCode()  + "]:";
-      }
-
-      // HIVE-4889
-      if ((e instanceof IllegalArgumentException) && e.getMessage() == null && 
e.getCause() != null) {
-        errorMessage += " " + e.getCause().getMessage();
-      } else {
-        errorMessage += " " + e.getMessage();
-      }
-
-      if (error == ErrorMsg.TXNMGR_NOT_ACID) {
-        errorMessage += ". Failed command: " + queryStr;
-      }
-
-      CONSOLE.printError(errorMessage, "\n" + 
StringUtils.stringifyException(e));
-      throw createProcessorException(error.getErrorCode(), errorMessage, 
error.getSQLState(), e);
-    } finally {
-      // Trigger post compilation hook. Note that if the compilation fails 
here then
-      // before/after execution hook will never be executed.
-      if (!parseError) {
-        try {
-          driverContext.getHookRunner().runAfterCompilationHook(command, 
compileError);
-        } catch (Exception e) {
-          LOG.warn("Failed when invoking query after-compilation hook.", e);
-        }
-      }
-
-      double duration = perfLogger.PerfLogEnd(CLASS_NAME, 
PerfLogger.COMPILE)/1000.00;
-      ImmutableMap<String, Long> compileHMSTimings = 
dumpMetaCallTimingWithoutEx("compilation");
-      
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, 
compileHMSTimings);
-
-      boolean isInterrupted = driverState.isAborted();
-      if (isInterrupted && !deferClose) {
-        closeInProcess(true);
-      }
-
-      if (isInterrupted) {
-        driverState.compilationInterruptedWithLocking(deferClose);
-        LOG.info("Compiling command(queryId=" + queryId + ") has been 
interrupted after " + duration + " seconds");
-      } else {
-        driverState.compilationFinishedWithLocking(compileError);
-        LOG.info("Completed compiling command(queryId=" + queryId + "); Time 
taken: " + duration + " seconds");
-      }
-    }
   }
 
   private void createTransactionManager() throws CommandProcessorException {
@@ -582,8 +278,55 @@ public class Driver implements IDriver {
       String errorMessage = "FAILED: " + e.getClass().getSimpleName() + " 
[Error "  + error.getErrorCode()  + "]:";
 
       CONSOLE.printError(errorMessage, "\n" + 
StringUtils.stringifyException(e));
-      throw createProcessorException(error.getErrorCode(), errorMessage, 
error.getSQLState(), e);
+      throw DriverUtils.createProcessorException(driverContext, 
error.getErrorCode(), errorMessage, error.getSQLState(),
+          e);
+    }
+  }
+
+  private void prepareContext() throws CommandProcessorException {
+    if (context != null && context.getExplainAnalyze() != 
AnalyzeState.RUNNING) {
+      // close the existing ctx etc before compiling a new query, but does not 
destroy driver
+      closeInProcess(false);
+    }
+
+    try {
+      if (context == null) {
+        context = new Context(driverContext.getConf());
+      }
+    } catch (IOException e) {
+      throw new CommandProcessorException(e);
+    }
+
+    context.setHiveTxnManager(driverContext.getTxnManager());
+    context.setStatsSource(driverContext.getStatsSource());
+    context.setHDFSCleanup(true);
+  }
+
+  private void setQueryId() {
+    String queryId = 
Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ?
+        QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId();
+
+    SparkSession ss = SessionState.get().getSparkSession();
+    if (ss != null) {
+      ss.onQuerySubmission(queryId);
     }
+    driverContext.getQueryDisplay().setQueryId(queryId);
+
+    setTriggerContext(queryId);
+  }
+
+  private void setTriggerContext(String queryId) {
+    long queryStartTime;
+    // query info is created by SQLOperation which will have start time of the 
operation. When JDBC Statement is not
+    // used queryInfo will be null, in which case we take creation of Driver 
instance as query start time (which is also
+    // the time when query display object is created)
+    if (driverContext.getQueryInfo() != null) {
+      queryStartTime = driverContext.getQueryInfo().getBeginTime();
+    } else {
+      queryStartTime = driverContext.getQueryDisplay().getQueryStartTime();
+    }
+    WmContext wmContext = new WmContext(queryStartTime, queryId);
+    context.setWmContext(wmContext);
   }
 
   // Checks whether txn list has been invalidated while planning the query.
@@ -606,12 +349,12 @@ public class Driver implements IDriver {
     // 2) Get locks that are relevant:
     // - Exclusive for INSERT OVERWRITE.
     // - Semi-shared for UPDATE/DELETE.
-    if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
+    if (context.getHiveLocks() == null || context.getHiveLocks().isEmpty()) {
       // Nothing to check
       return true;
     }
     Set<String> nonSharedLocks = new HashSet<>();
-    for (HiveLock lock : ctx.getHiveLocks()) {
+    for (HiveLock lock : context.getHiveLocks()) {
       if (lock.mayContainComponents()) {
         // The lock may have multiple components, e.g., DbHiveLock, hence we 
need
         // to check for each of them
@@ -684,163 +427,6 @@ public class Driver implements IDriver {
     return true;
   }
 
-  private void setTriggerContext(final String queryId) {
-    final long queryStartTime;
-    // query info is created by SQLOperation which will have start time of the 
operation. When JDBC Statement is not
-    // used queryInfo will be null, in which case we take creation of Driver 
instance as query start time (which is also
-    // the time when query display object is created)
-    if (driverContext.getQueryInfo() != null) {
-      queryStartTime = driverContext.getQueryInfo().getBeginTime();
-    } else {
-      queryStartTime = driverContext.getQueryDisplay().getQueryStartTime();
-    }
-    WmContext wmContext = new WmContext(queryStartTime, queryId);
-    ctx.setWmContext(wmContext);
-  }
-
-  /**
-   * Last repl id should be captured before opening txn by current REPL DUMP 
operation.
-   * This is needed to avoid losing data which are added/modified by 
concurrent txns when bootstrap
-   * dump in progress.
-   * @param conf Query configurations
-   * @throws HiveException
-   * @throws TException
-   */
-  private void setLastReplIdForDump(HiveConf conf) throws HiveException, 
TException {
-    // Last logged notification event id would be the last repl Id for the 
current REPl DUMP.
-    Hive hiveDb = Hive.get();
-    Long lastReplId = 
hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
-    conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId);
-    LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
-  }
-
-  private void openTransaction(TxnType txnType) throws LockException, 
CommandProcessorException {
-    if (checkConcurrency() && startImplicitTxn(driverContext.getTxnManager()) 
&&
-        !driverContext.getTxnManager().isTxnOpen()) {
-      String userFromUGI = getUserFromUGI();
-      driverContext.getTxnManager().openTxn(ctx, userFromUGI, txnType);
-    }
-  }
-
-  private void generateValidTxnList() throws LockException {
-    // Record current valid txn list that will be used throughout the query
-    // compilation and processing. We only do this if 1) a transaction
-    // was already opened and 2) the list has not been recorded yet,
-    // e.g., by an explicit open transaction command.
-    driverContext.setValidTxnListsGenerated(false);
-    String currentTxnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
-    if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null 
|| currentTxnString.isEmpty())) {
-      try {
-        recordValidTxns(driverContext.getTxnManager());
-        driverContext.setValidTxnListsGenerated(true);
-      } catch (LockException e) {
-        LOG.error("Exception while acquiring valid txn list", e);
-        throw e;
-      }
-    }
-  }
-
-  private boolean startImplicitTxn(HiveTxnManager txnManager) throws 
LockException {
-    boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
-    //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
-    HiveOperation hiveOperation = 
driverContext.getQueryState().getHiveOperation();
-    switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) {
-    case COMMIT:
-    case ROLLBACK:
-      if (!txnManager.isTxnOpen()) {
-        throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, 
hiveOperation.getOperationName());
-      }
-    case SWITCHDATABASE:
-    case SET_AUTOCOMMIT:
-      /**
-       * autocommit is here for completeness.  TM doesn't use it.  If we want 
to support JDBC
-       * semantics (or any other definition of autocommit) it should be done 
at session level.
-       */
-    case SHOWDATABASES:
-    case SHOWTABLES:
-    case SHOWCOLUMNS:
-    case SHOWFUNCTIONS:
-    case SHOWPARTITIONS:
-    case SHOWLOCKS:
-    case SHOWVIEWS:
-    case SHOW_ROLES:
-    case SHOW_ROLE_PRINCIPALS:
-    case SHOW_COMPACTIONS:
-    case SHOW_TRANSACTIONS:
-    case ABORT_TRANSACTIONS:
-    case KILL_QUERY:
-      shouldOpenImplicitTxn = false;
-      //this implies that no locks are needed for such a command
-    }
-    return shouldOpenImplicitTxn;
-  }
-
-  private void checkInterrupted(String msg, HookContext hookContext, 
PerfLogger perfLogger)
-      throws CommandProcessorException {
-    if (driverState.isAborted()) {
-      String errorMessage = "FAILED: command has been interrupted: " + msg;
-      CONSOLE.printError(errorMessage);
-      if (hookContext != null) {
-        try {
-          invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
-        } catch (Exception e) {
-          LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
-        }
-      }
-      throw createProcessorException(1000, errorMessage, "HY008", null);
-    }
-  }
-
-  private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) 
{
-    try {
-      return Hive.get().dumpAndClearMetaCallTiming(phase);
-    } catch (HiveException he) {
-      LOG.warn("Caught exception attempting to write metadata call information 
" + he, he);
-    }
-    return null;
-  }
-
-  /**
-   * Returns EXPLAIN EXTENDED output for a semantically
-   * analyzed query.
-   *
-   * @param sem semantic analyzer for analyzed query
-   * @param plan query plan
-   * @param astTree AST tree dump
-   * @throws java.io.IOException
-   */
-  private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan,
-      ASTNode astTree) throws IOException {
-    String ret = null;
-    ExplainTask task = new ExplainTask();
-    task.initialize(driverContext.getQueryState(), plan, null, ctx);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PrintStream ps = new PrintStream(baos);
-    try {
-      List<Task<?>> rootTasks = sem.getAllRootTasks();
-      if 
(driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) {
-        JSONObject jsonPlan = task.getJSONPlan(
-            null, rootTasks, sem.getFetchTask(), true, true, true, 
sem.getCboInfo(),
-            plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
-        if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null &&
-            jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <=
-                
driverContext.getConf().getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) {
-          ret = jsonPlan.toString();
-        } else {
-          ret = null;
-        }
-      } else {
-        task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, 
sem.getCboInfo(),
-            plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
-        ret = baos.toString();
-      }
-    } catch (Exception e) {
-      LOG.warn("Exception generating explain output: " + e, e);
-    }
-
-    return ret;
-  }
-
   @Override
   public HiveConf getConf() {
     return driverContext.getConf();
@@ -862,19 +448,6 @@ public class Driver implements IDriver {
     return driverContext.getFetchTask();
   }
 
-  // Write the current set of valid transactions into the conf file
-  private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
-    String oldTxnString = 
driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY);
-    if ((oldTxnString != null) && (oldTxnString.length() > 0)) {
-      throw new IllegalStateException("calling recordValidTxn() more than once 
in the same " +
-              JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
-    }
-    ValidTxnList txnList = txnMgr.getValidTxns();
-    String txnStr = txnList.toString();
-    driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr);
-    LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + 
txnMgr.getCurrentTxnId());
-  }
-
   // Write the current set of valid write ids for the operated acid tables 
into the conf file so
   // that it can be read by the input format.
   private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) 
throws LockException {
@@ -979,18 +552,6 @@ public class Driver implements IDriver {
     tables.put(fullTableName, tbl);
   }
 
-  private String getUserFromUGI() throws CommandProcessorException {
-    // Don't use the userName member, as it may or may not have been set.  Get 
the value from
-    // conf, which calls into getUGI to figure out who the process is running 
as.
-    try {
-      return driverContext.getConf().getUser();
-    } catch (IOException e) {
-      String errorMessage = "FAILED: Error in determining user while acquiring 
locks: " + e.getMessage();
-      CONSOLE.printError(errorMessage, "\n" + 
StringUtils.stringifyException(e));
-      throw createProcessorException(10, errorMessage, 
ErrorMsg.findSQLState(e.getMessage()), e);
-    }
-  }
-
   /**
    * Acquire read and write locks needed by the statement. The list of objects 
to be locked are
    * obtained from the inputs and outputs populated by the compiler.  Locking 
strategy depends on
@@ -1012,7 +573,7 @@ public class Driver implements IDriver {
       return;
     }
     try {
-      String userFromUGI = getUserFromUGI();
+      String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
 
       // Set the table write id in all of the acid file sinks
       if (!driverContext.getPlan().getAcidSinks().isEmpty()) {
@@ -1061,8 +622,8 @@ public class Driver implements IDriver {
 
       /*It's imperative that {@code acquireLocks()} is called for all commands 
so that
       HiveTxnManager can transition its state machine correctly*/
-      driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), ctx, 
userFromUGI, driverState);
-      final List<HiveLock> locks = ctx.getHiveLocks();
+      driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), 
context, userFromUGI, driverState);
+      final List<HiveLock> locks = context.getHiveLocks();
       LOG.info("Operation {} obtained {} locks", 
driverContext.getPlan().getOperation(),
           ((locks == null) ? 0 : locks.size()));
       // This check is for controlling the correctness of the current state
@@ -1081,7 +642,8 @@ public class Driver implements IDriver {
     } catch (Exception e) {
       String errorMessage = "FAILED: Error in acquiring locks: " + 
e.getMessage();
       CONSOLE.printError(errorMessage, "\n" + 
StringUtils.stringifyException(e));
-      throw createProcessorException(10, errorMessage, 
ErrorMsg.findSQLState(e.getMessage()), e);
+      throw DriverUtils.createProcessorException(driverContext, 10, 
errorMessage, ErrorMsg.findSQLState(e.getMessage()),
+          e);
     } finally {
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
     }
@@ -1112,7 +674,7 @@ public class Driver implements IDriver {
     // releasing the locks.
     driverContext.getConf().unset(ValidTxnList.VALID_TXNS_KEY);
     
driverContext.getConf().unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
-    if(!checkConcurrency()) {
+    if (!DriverUtils.checkConcurrency(driverContext)) {
       return;
     }
     if (txnMgr.isTxnOpen()) {
@@ -1129,14 +691,14 @@ public class Driver implements IDriver {
       }
     } else {
       //since there is no tx, we only have locks for current query (if any)
-      if (ctx != null && ctx.getHiveLocks() != null) {
-        hiveLocks.addAll(ctx.getHiveLocks());
+      if (context != null && context.getHiveLocks() != null) {
+        hiveLocks.addAll(context.getHiveLocks());
       }
       txnMgr.releaseLocks(hiveLocks);
     }
     hiveLocks.clear();
-    if (ctx != null) {
-      ctx.setHiveLocks(null);
+    if (context != null) {
+      context.setHiveLocks(null);
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
@@ -1269,7 +831,8 @@ public class Driver implements IDriver {
       }
       if (!success) {
         String errorMessage = 
ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg();
-        throw 
createProcessorException(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), 
errorMessage, null, null);
+        throw DriverUtils.createProcessorException(driverContext, 
ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(),
+            errorMessage, null, null);
       }
 
       try {
@@ -1301,7 +864,7 @@ public class Driver implements IDriver {
         } else {
           String errorMessage = "FAILED: Precompiled query has been cancelled 
or closed.";
           CONSOLE.printError(errorMessage);
-          throw createProcessorException(12, errorMessage, null, null);
+          throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage, null, null);
         }
       } else {
         driverState.compiling();
@@ -1315,14 +878,15 @@ public class Driver implements IDriver {
     boolean isFinishedWithError = true;
     try {
       HiveDriverRunHookContext hookContext = new 
HiveDriverRunHookContextImpl(driverContext.getConf(),
-          alreadyCompiled ? ctx.getCmd() : command);
+          alreadyCompiled ? context.getCmd() : command);
       // Get all the driver run hooks and pre-execute them.
       try {
         driverContext.getHookRunner().runPreDriverHooks(hookContext);
       } catch (Exception e) {
         String errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
         CONSOLE.printError(errorMessage + "\n" + 
StringUtils.stringifyException(e));
-        throw createProcessorException(12, errorMessage, 
ErrorMsg.findSQLState(e.getMessage()), e);
+        throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage,
+            ErrorMsg.findSQLState(e.getMessage()), e);
       }
 
       if (!alreadyCompiled) {
@@ -1340,9 +904,9 @@ public class Driver implements IDriver {
       // the reason that we set the txn manager for the cxt here is because 
each
       // query has its own ctx object. The txn mgr is shared across the
       // same instance of Driver, which can run multiple queries.
-      ctx.setHiveTxnManager(driverContext.getTxnManager());
+      context.setHiveTxnManager(driverContext.getTxnManager());
 
-      checkInterrupted("at acquiring the lock.", null, null);
+      DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring 
the lock.", null, null);
 
       lockAndRespond();
 
@@ -1356,9 +920,9 @@ public class Driver implements IDriver {
           // and then, we acquire locks. If snapshot is still valid, we 
continue as usual.
           // But if snapshot is not valid, we recompile the query.
           driverContext.setRetrial(true);
-          driverContext.getBackupContext().addRewrittenStatementContext(ctx);
-          driverContext.getBackupContext().setHiveLocks(ctx.getHiveLocks());
-          ctx = driverContext.getBackupContext();
+          
driverContext.getBackupContext().addRewrittenStatementContext(context);
+          
driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
+          context = driverContext.getBackupContext();
           driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
               driverContext.getTxnManager().getValidTxns().toString());
           if (driverContext.getPlan().hasAcidResourcesInQuery()) {
@@ -1384,7 +948,7 @@ public class Driver implements IDriver {
           // the reason that we set the txn manager for the cxt here is 
because each
           // query has its own ctx object. The txn mgr is shared across the
           // same instance of Driver, which can run multiple queries.
-          ctx.setHiveTxnManager(driverContext.getTxnManager());
+          context.setHiveTxnManager(driverContext.getTxnManager());
         }
       } catch (LockException e) {
         throw handleHiveException(e, 13);
@@ -1427,7 +991,8 @@ public class Driver implements IDriver {
       } catch (Exception e) {
         String errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
         CONSOLE.printError(errorMessage + "\n" + 
StringUtils.stringifyException(e));
-        throw createProcessorException(12, errorMessage, 
ErrorMsg.findSQLState(e.getMessage()), e);
+        throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage,
+            ErrorMsg.findSQLState(e.getMessage()), e);
       }
       isFinishedWithError = false;
     } finally {
@@ -1473,11 +1038,11 @@ public class Driver implements IDriver {
     String sqlState = e.getCanonicalErrorMsg() != null ?
         e.getCanonicalErrorMsg().getSQLState() : 
ErrorMsg.findSQLState(e.getMessage());
     CONSOLE.printError(errorMessage + "\n" + 
StringUtils.stringifyException(e));
-    throw createProcessorException(ret, errorMessage, sqlState, e);
+    throw DriverUtils.createProcessorException(driverContext, ret, 
errorMessage, sqlState, e);
   }
 
   private boolean requiresLock() {
-    if (!checkConcurrency()) {
+    if (!DriverUtils.checkConcurrency(driverContext)) {
       LOG.info("Concurrency mode is disabled, not creating a lock manager");
       return false;
     }
@@ -1526,23 +1091,10 @@ public class Driver implements IDriver {
     return false;
   }
 
-  private CommandProcessorException createProcessorException(int ret, String 
errorMessage, String sqlState,
-      Throwable downstreamError) {
-    SessionState.getPerfLogger().cleanupPerfLogMetrics();
-    driverContext.getQueryDisplay().setErrorMessage(errorMessage);
-    if (downstreamError != null && downstreamError instanceof HiveException) {
-      ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
-      if (em != null) {
-        return new CommandProcessorException(ret, em.getErrorCode(), 
errorMessage, sqlState, downstreamError);
-      }
-    }
-    return new CommandProcessorException(ret, -1, errorMessage, sqlState, 
downstreamError);
-  }
-
   private void useFetchFromCache(CacheEntry cacheEntry) {
     // Change query FetchTask to use new location specified in results cache.
     FetchTask fetchTaskFromCache = (FetchTask) 
TaskFactory.get(cacheEntry.getFetchWork());
-    fetchTaskFromCache.initialize(driverContext.getQueryState(), 
driverContext.getPlan(), null, ctx);
+    fetchTaskFromCache.initialize(driverContext.getQueryState(), 
driverContext.getPlan(), null, context);
     driverContext.getPlan().setFetchTask(fetchTaskFromCache);
     driverContext.setCacheUsage(new 
CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
   }
@@ -1629,7 +1181,7 @@ public class Driver implements IDriver {
       if (!driverState.isCompiled() && !driverState.isExecuting()) {
         String errorMessage = "FAILED: unexpected driverstate: " + driverState 
+ ", for query " + queryStr;
         CONSOLE.printError(errorMessage);
-        throw createProcessorException(1000, errorMessage, "HY008", null);
+        throw DriverUtils.createProcessorException(driverContext, 1000, 
errorMessage, "HY008", null);
       } else {
         driverState.executing();
       }
@@ -1659,11 +1211,10 @@ public class Driver implements IDriver {
       SessionState ss = SessionState.get();
 
       // TODO: should this use getUserFromAuthenticator?
-      hookContext = new PrivateHookContext(driverContext.getPlan(), 
driverContext.getQueryState(), ctx.getPathToCS(),
-          SessionState.get().getUserName(),
-          ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), 
operationId,
-          ss.getSessionId(), Thread.currentThread().getName(), 
ss.isHiveServerQuery(), perfLogger,
-          driverContext.getQueryInfo(), ctx);
+      hookContext = new PrivateHookContext(driverContext.getPlan(), 
driverContext.getQueryState(),
+          context.getPathToCS(), SessionState.get().getUserName(), 
ss.getUserIpAddress(),
+          InetAddress.getLocalHost().getHostAddress(), operationId, 
ss.getSessionId(), Thread.currentThread().getName(),
+          ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), 
context);
       hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
 
       driverContext.getHookRunner().runPreHooks(hookContext);
@@ -1693,12 +1244,12 @@ public class Driver implements IDriver {
       // At any time, at most maxthreads tasks can be running
       // The main thread polls the TaskRunners to check if they have finished.
 
-      checkInterrupted("before running tasks.", hookContext, perfLogger);
+      DriverUtils.checkInterrupted(driverState, driverContext, "before running 
tasks.", hookContext, perfLogger);
 
-      taskQueue = new TaskQueue(ctx); // for canceling the query (should be 
bound to session?)
+      taskQueue = new TaskQueue(context); // for canceling the query (should 
be bound to session?)
       taskQueue.prepare(driverContext.getPlan());
 
-      ctx.setHDFSCleanup(true);
+      context.setHDFSCleanup(true);
 
       SessionState.get().setMapRedStats(new LinkedHashMap<>());
       SessionState.get().setStackTraces(new HashMap<>());
@@ -1755,7 +1306,8 @@ public class Driver implements IDriver {
         TaskResult result = tskRun.getTaskResult();
 
         int exitVal = result.getExitVal();
-        checkInterrupted("when checking the execution result.", hookContext, 
perfLogger);
+        DriverUtils.checkInterrupted(driverState, driverContext, "when 
checking the execution result.", hookContext,
+            perfLogger);
 
         if (exitVal != 0) {
           Task<?> backupTask = tsk.getAndInitBackupTask();
@@ -1776,7 +1328,7 @@ public class Driver implements IDriver {
             if (taskQueue.isShutdown()) {
               errorMessage = "FAILED: Operation cancelled. " + errorMessage;
             }
-            invokeFailureHooks(perfLogger, hookContext,
+            DriverUtils.invokeFailureHooks(driverContext, perfLogger, 
hookContext,
               errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), 
result.getTaskError());
             String sqlState = "08S01";
 
@@ -1794,8 +1346,9 @@ public class Driver implements IDriver {
             taskQueue.shutdown();
             // in case we decided to run everything in local mode, restore the
             // the jobtracker setting to its initial value
-            ctx.restoreOriginalTracker();
-            throw createProcessorException(exitVal, errorMessage, sqlState, 
result.getTaskError());
+            context.restoreOriginalTracker();
+            throw DriverUtils.createProcessorException(driverContext, exitVal, 
errorMessage, sqlState,
+                result.getTaskError());
           }
         }
 
@@ -1821,13 +1374,13 @@ public class Driver implements IDriver {
 
       // in case we decided to run everything in local mode, restore the
       // the jobtracker setting to its initial value
-      ctx.restoreOriginalTracker();
+      context.restoreOriginalTracker();
 
       if (taskQueue.isShutdown()) {
         String errorMessage = "FAILED: Operation cancelled";
-        invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
+        DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, 
errorMessage, null);
         CONSOLE.printError(errorMessage);
-        throw createProcessorException(1000, errorMessage, "HY008", null);
+        throw DriverUtils.createProcessorException(driverContext, 1000, 
errorMessage, "HY008", null);
       }
 
       // remove incomplete outputs.
@@ -1861,9 +1414,10 @@ public class Driver implements IDriver {
     } catch (Throwable e) {
       executionError = true;
 
-      checkInterrupted("during query execution: \n" + e.getMessage(), 
hookContext, perfLogger);
+      DriverUtils.checkInterrupted(driverState, driverContext, "during query 
execution: \n" + e.getMessage(),
+          hookContext, perfLogger);
 
-      ctx.restoreOriginalTracker();
+      context.restoreOriginalTracker();
       if (SessionState.get() != null) {
         SessionState.get().getHiveHistory().setQueryProperty(queryId, 
Keys.QUERY_RET_CODE,
             String.valueOf(12));
@@ -1872,13 +1426,13 @@ public class Driver implements IDriver {
       String errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
       if (hookContext != null) {
         try {
-          invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
+          DriverUtils.invokeFailureHooks(driverContext, perfLogger, 
hookContext, errorMessage, e);
         } catch (Exception t) {
           LOG.warn("Failed to invoke failure hook", t);
         }
       }
       CONSOLE.printError(errorMessage + "\n" + 
StringUtils.stringifyException(e));
-      throw createProcessorException(12, errorMessage, "08S01", e);
+      throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage, "08S01", e);
     } finally {
       // Trigger query hooks after query completes its execution.
       try {
@@ -1895,7 +1449,7 @@ public class Driver implements IDriver {
       }
       double duration = perfLogger.PerfLogEnd(CLASS_NAME, 
PerfLogger.DRIVER_EXECUTE)/1000.00;
 
-      ImmutableMap<String, Long> executionHMSTimings = 
dumpMetaCallTimingWithoutEx("execution");
+      ImmutableMap<String, Long> executionHMSTimings = 
Hive.dumpMetaCallTimingWithoutEx("execution");
       
driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, 
executionHMSTimings);
 
       Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
@@ -2016,15 +1570,6 @@ public class Driver implements IDriver {
     return errorMessage;
   }
 
-  private void invokeFailureHooks(PerfLogger perfLogger,
-      HookContext hookContext, String errorMessage, Throwable exception) 
throws Exception {
-    hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
-    hookContext.setErrorMessage(errorMessage);
-    hookContext.setException(exception);
-    // Get all the failure execution hooks and execute them.
-    driverContext.getHookRunner().runFailureHooks(hookContext);
-  }
-
   /**
    * Launches a new task
    *
@@ -2055,7 +1600,7 @@ public class Driver implements IDriver {
       taskQueue.incCurJobNo(1);
       CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of 
" + jobs);
     }
-    tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), 
taskQueue, ctx);
+    tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), 
taskQueue, context);
     TaskRunner tskRun = new TaskRunner(tsk, taskQueue);
 
     taskQueue.launching(tskRun);
@@ -2101,7 +1646,7 @@ public class Driver implements IDriver {
     }
 
     if (resStream == null) {
-      resStream = ctx.getStream();
+      resStream = context.getStream();
     }
     if (resStream == null) {
       return false;
@@ -2140,7 +1685,7 @@ public class Driver implements IDriver {
       }
 
       if (ss == Utilities.StreamStatus.EOF) {
-        resStream = ctx.getStream();
+        resStream = context.getStream();
       }
     }
     return true;
@@ -2158,9 +1703,9 @@ public class Driver implements IDriver {
         throw new IOException("Error closing the current fetch task", e);
       }
       // FetchTask should not depend on the plan.
-      driverContext.getFetchTask().initialize(driverContext.getQueryState(), 
null, null, ctx);
+      driverContext.getFetchTask().initialize(driverContext.getQueryState(), 
null, null, context);
     } else {
-      ctx.resetStream();
+      context.resetStream();
       resStream = null;
     }
   }
@@ -2199,7 +1744,7 @@ public class Driver implements IDriver {
 
   private void releaseContext() {
     try {
-      if (ctx != null) {
+      if (context != null) {
         boolean deleteResultDir = true;
         // don't let context delete result dirs and scratch dirs if result was 
cached
         if (driverContext.getCacheUsage() != null
@@ -2207,12 +1752,12 @@ public class Driver implements IDriver {
           deleteResultDir = false;
 
         }
-        ctx.clear(deleteResultDir);
-        if (ctx.getHiveLocks() != null) {
-          hiveLocks.addAll(ctx.getHiveLocks());
-          ctx.setHiveLocks(null);
+        context.clear(deleteResultDir);
+        if (context.getHiveLocks() != null) {
+          hiveLocks.addAll(context.getHiveLocks());
+          context.setHiveLocks(null);
         }
-        ctx = null;
+        context = null;
       }
     } catch (Exception e) {
       LOG.debug("Exception while clearing the context ", e);
@@ -2271,14 +1816,14 @@ public class Driver implements IDriver {
   // Close and release resources within a running query process. Since it runs 
under
   // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race 
condition
   // with the releases probably running in the other closing thread.
-  private int closeInProcess(boolean destroyed) {
+  public int closeInProcess(boolean destroyed) {
     releaseTaskQueue();
     releasePlan();
     releaseCachedResult();
     releaseFetchTask();
     releaseResStream();
     releaseContext();
-    if(destroyed) {
+    if (destroyed) {
       if (!hiveLocks.isEmpty()) {
         try {
           releaseLocksAndCommitOrRollback(false);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 13e2f29..bbf7fe5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -84,6 +84,14 @@ public class DriverContext {
     return queryDisplay;
   }
 
+  public String getQueryId() {
+    return queryDisplay.getQueryId();
+  }
+
+  public String getQueryString() {
+    return queryDisplay.getQueryString();
+  }
+
   public QueryState getQueryState() {
     return queryState;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
index aa8e64d..1eacf69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -15,19 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.ql;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DriverUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
+/**
+ * Utility functions for the Driver.
+ */
+public final class DriverUtils {
+  private static final String CLASS_NAME = Driver.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+  private DriverUtils() {
+    throw new UnsupportedOperationException("DriverUtils should not be 
instantiated!");
+  }
 
   public static void runOnDriver(HiveConf conf, String user, SessionState 
sessionState,
       String query) throws HiveException {
@@ -89,4 +105,58 @@ public class DriverUtils {
     }
     return sessionState;
   }
+
+  public static void checkInterrupted(DriverState driverState, DriverContext 
driverContext, String msg,
+      HookContext hookContext, PerfLogger perfLogger) throws 
CommandProcessorException {
+    if (driverState.isAborted()) {
+      String errorMessage = "FAILED: command has been interrupted: " + msg;
+      CONSOLE.printError(errorMessage);
+      if (hookContext != null) {
+        try {
+          invokeFailureHooks(driverContext, perfLogger, hookContext, 
errorMessage, null);
+        } catch (Exception e) {
+          LOG.warn("Caught exception attempting to invoke Failure Hooks", e);
+        }
+      }
+      throw createProcessorException(driverContext, 1000, errorMessage, 
"HY008", null);
+    }
+  }
+
+  public static void invokeFailureHooks(DriverContext driverContext, 
PerfLogger perfLogger, HookContext hookContext,
+      String errorMessage, Throwable exception) throws Exception {
+    hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
+    hookContext.setErrorMessage(errorMessage);
+    hookContext.setException(exception);
+    // Get all the failure execution hooks and execute them.
+    driverContext.getHookRunner().runFailureHooks(hookContext);
+  }
+
+  public static CommandProcessorException 
createProcessorException(DriverContext driverContext, int ret,
+      String errorMessage, String sqlState, Throwable downstreamError) {
+    SessionState.getPerfLogger().cleanupPerfLogMetrics();
+    driverContext.getQueryDisplay().setErrorMessage(errorMessage);
+    if (downstreamError != null && downstreamError instanceof HiveException) {
+      ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+      if (em != null) {
+        return new CommandProcessorException(ret, em.getErrorCode(), 
errorMessage, sqlState, downstreamError);
+      }
+    }
+    return new CommandProcessorException(ret, -1, errorMessage, sqlState, 
downstreamError);
+  }
+
+  public static boolean checkConcurrency(DriverContext driverContext) {
+    return 
driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+  }
+
+  public static String getUserFromUGI(DriverContext driverContext) throws 
CommandProcessorException {
+    // Don't use the userName member, as it may or may not have been set.  Get 
the value from
+    // conf, which calls into getUGI to figure out who the process is running 
as.
+    try {
+      return driverContext.getConf().getUser();
+    } catch (IOException e) {
+      String errorMessage = "FAILED: Error in determining user while acquiring 
locks: " + e.getMessage();
+      CONSOLE.printError(errorMessage, "\n" + 
StringUtils.stringifyException(e));
+      throw createProcessorException(driverContext, 10, errorMessage, 
ErrorMsg.findSQLState(e.getMessage()), e);
+    }
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 4d79ebc..c1f94d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec;
 
 import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
@@ -46,10 +48,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import 
org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -81,17 +87,13 @@ import com.google.common.annotations.VisibleForTesting;
  *
  **/
 public class ExplainTask extends Task<ExplainWork> implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExplainTask.class.getName());
+
   public static final String STAGE_DEPENDENCIES = "STAGE DEPENDENCIES";
   private static final long serialVersionUID = 1L;
   public static final String EXPL_COLUMN_NAME = "Explain";
   private final Set<Operator<?>> visitedOps = new HashSet<Operator<?>>();
   private boolean isLogical = false;
-  protected final Logger LOG;
-
-  public ExplainTask() {
-    super();
-    LOG = LoggerFactory.getLogger(this.getClass().getName());
-  }
 
   /*
    * Below method returns the dependencies for the passed in query to EXPLAIN.
@@ -1292,4 +1294,44 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
   public boolean canExecuteInParallel() {
     return false;
   }
+
+
+  /**
+   * Returns EXPLAIN EXTENDED output for a semantically analyzed query.
+   *
+   * @param sem semantic analyzer for analyzed query
+   * @param plan query plan
+   * @param astTree AST tree dump
+   */
+  public static String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan 
plan, ASTNode astTree,
+      QueryState queryState, Context context, HiveConf conf) throws 
IOException {
+    String ret = null;
+    ExplainTask task = new ExplainTask();
+    task.initialize(queryState, plan, null, context);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream ps = new PrintStream(baos);
+    try {
+      List<Task<?>> rootTasks = sem.getAllRootTasks();
+      if (conf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) {
+        JSONObject jsonPlan = task.getJSONPlan(
+            null, rootTasks, sem.getFetchTask(), true, true, true, 
sem.getCboInfo(),
+            plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
+        if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null &&
+            jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <=
+                conf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) {
+          ret = jsonPlan.toString();
+        } else {
+          ret = null;
+        }
+      } else {
+        task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, 
sem.getCboInfo(),
+            plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString());
+        ret = baos.toString();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception generating explain output: " + e, e);
+    }
+
+    return ret;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index fa6c9d0..f4bd0f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -5389,6 +5389,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
     metaCallTimeMap.clear();
   }
 
+  public static ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String 
phase) {
+    try {
+      return get().dumpAndClearMetaCallTiming(phase);
+    } catch (HiveException he) {
+      LOG.warn("Caught exception attempting to write metadata call information 
" + he, he);
+    }
+    return null;
+  }
+
   public ImmutableMap<String, Long> dumpAndClearMetaCallTiming(String phase) {
     boolean phaseInfoLogged = false;
     if (LOG.isDebugEnabled()) {

Reply via email to