pvary commented on a change in pull request #1222:
URL: https://github.com/apache/hive/pull/1222#discussion_r453154237



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
##########
@@ -139,205 +119,215 @@ public Driver(QueryState queryState, QueryInfo 
queryInfo, HiveTxnManager txnMana
     driverTxnHandler = new DriverTxnHandler(this, driverContext, driverState);
   }
 
-  /**
-   * Compile a new query, but potentially reset taskID counter.  Not resetting 
task counter
-   * is useful for generating re-entrant QL queries.
-   * @param command  The HiveQL query to compile
-   * @param resetTaskIds Resets taskID counter if true.
-   * @return 0 for ok
-   */
-  public int compile(String command, boolean resetTaskIds) {
-    try {
-      compile(command, resetTaskIds, false);
-      return 0;
-    } catch (CommandProcessorException cpr) {
-      return cpr.getErrorCode();
-    }
+  @Override
+  public Context getContext() {
+    return context;
   }
 
-  // deferClose indicates if the close/destroy should be deferred when the 
process has been
-  // interrupted, it should be set to true if the compile is called within 
another method like
-  // runInternal, which defers the close to the called in that method.
-  @VisibleForTesting
-  public void compile(String command, boolean resetTaskIds, boolean 
deferClose) throws CommandProcessorException {
-    preparForCompile(resetTaskIds);
-
-    Compiler compiler = new Compiler(context, driverContext, driverState);
-    QueryPlan plan = compiler.compile(command, deferClose);
-    driverContext.setPlan(plan);
-
-    compileFinished(deferClose);
+  @Override
+  public HiveConf getConf() {
+    return driverContext.getConf();
   }
 
-  private void compileFinished(boolean deferClose) {
-    if (DriverState.getDriverState().isAborted() && !deferClose) {
-      closeInProcess(true);
-    }
+  @Override
+  public CommandProcessorResponse run() throws CommandProcessorException {
+    return run(null, true);
   }
 
-  private void preparForCompile(boolean resetTaskIds) throws 
CommandProcessorException {
-    driverTxnHandler.createTxnManager();
-    DriverState.setDriverState(driverState);
-    prepareContext();
-    setQueryId();
+  @Override
+  public CommandProcessorResponse run(String command) throws 
CommandProcessorException {
+    return run(command, false);
+  }
 
-    if (resetTaskIds) {
-      TaskFactory.resetId();
+  private CommandProcessorResponse run(String command, boolean 
alreadyCompiled) throws CommandProcessorException {
+    try {
+      runInternal(command, alreadyCompiled);
+      return new CommandProcessorResponse(getSchema(), null);
+    } catch (CommandProcessorException cpe) {
+      processRunException(cpe);
+      throw cpe;
     }
   }
 
-  private void prepareContext() throws CommandProcessorException {
-    if (context != null && context.getExplainAnalyze() != 
AnalyzeState.RUNNING) {
-      // close the existing ctx etc before compiling a new query, but does not 
destroy driver
-      closeInProcess(false);
-    }
+  private void runInternal(String command, boolean alreadyCompiled) throws 
CommandProcessorException {
+    DriverState.setDriverState(driverState);
+    setInitialStateForRun(alreadyCompiled);
 
+    // a flag that helps to set the correct driver state in finally block by 
tracking if
+    // the method has been returned by an error or not.
+    boolean isFinishedWithError = true;
     try {
-      if (context == null) {
-        context = new Context(driverContext.getConf());
+      HiveDriverRunHookContext hookContext = new 
HiveDriverRunHookContextImpl(driverContext.getConf(),
+          alreadyCompiled ? context.getCmd() : command);
+      runPreDriverHooks(hookContext);
+
+      if (!alreadyCompiled) {
+        compileInternal(command, true);
+      } else {
+        
driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime());
       }
-    } catch (IOException e) {
-      throw new CommandProcessorException(e);
-    }
 
-    context.setHiveTxnManager(driverContext.getTxnManager());
-    context.setStatsSource(driverContext.getStatsSource());
-    context.setHDFSCleanup(true);
+      // Reset the PerfLogger so that it doesn't retain any previous values.
+      // Any value from compilation phase can be obtained through the map set 
in queryDisplay during compilation.
+      PerfLogger perfLogger = SessionState.getPerfLogger(true);
 
-    driverTxnHandler.setContext(context);
-  }
+      // the reason that we set the txn manager for the cxt here is because 
each query has its own ctx object.
+      // The txn mgr is shared across the same instance of Driver, which can 
run multiple queries.
+      context.setHiveTxnManager(driverContext.getTxnManager());
 
-  private void setQueryId() {
-    String queryId = 
Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ?
-        QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId();
+      DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring 
the lock.", null, null);
 
-    SparkSession ss = SessionState.get().getSparkSession();
-    if (ss != null) {
-      ss.onQuerySubmission(queryId);
-    }
-    driverContext.getQueryDisplay().setQueryId(queryId);
+      lockAndRespond();
+      validateTxnListState();
+      execute();
+      driverTxnHandler.handleTransactionAfterExecution();
 
-    setTriggerContext(queryId);
-  }
+      
driverContext.getQueryDisplay().setPerfLogStarts(QueryDisplay.Phase.EXECUTION, 
perfLogger.getStartTimes());
+      
driverContext.getQueryDisplay().setPerfLogEnds(QueryDisplay.Phase.EXECUTION, 
perfLogger.getEndTimes());
 
-  private void setTriggerContext(String queryId) {
-    long queryStartTime;
-    // query info is created by SQLOperation which will have start time of the 
operation. When JDBC Statement is not
-    // used queryInfo will be null, in which case we take creation of Driver 
instance as query start time (which is also
-    // the time when query display object is created)
-    if (driverContext.getQueryInfo() != null) {
-      queryStartTime = driverContext.getQueryInfo().getBeginTime();
-    } else {
-      queryStartTime = driverContext.getQueryDisplay().getQueryStartTime();
+      runPostDriverHooks(hookContext);
+      isFinishedWithError = false;
+    } finally {
+      if (driverState.isAborted()) {
+        closeInProcess(true);
+      } else {
+        releaseResources();
+      }
+
+      driverState.executionFinishedWithLocking(isFinishedWithError);
     }
-    WmContext wmContext = new WmContext(queryStartTime, queryId);
-    context.setWmContext(wmContext);
-  }
 
-  @Override
-  public HiveConf getConf() {
-    return driverContext.getConf();
+    SessionState.getPerfLogger().cleanupPerfLogMetrics();
   }
 
-  /**
-   * @return The current query plan associated with this Driver, if any.
-   */
-  @Override
-  public QueryPlan getPlan() {
-    return driverContext.getPlan();
+  private void setInitialStateForRun(boolean alreadyCompiled) throws 
CommandProcessorException {
+    driverState.lock();
+    try {
+      if (alreadyCompiled) {
+        if (driverState.isCompiled()) {
+          driverState.executing();
+        } else {
+          String errorMessage = "FAILED: Precompiled query has been cancelled 
or closed.";
+          CONSOLE.printError(errorMessage);
+          throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage, null, null);
+        }
+      } else {
+        driverState.compiling();
+      }
+    } finally {
+      driverState.unlock();
+    }
   }
 
-  /**
-   * @return The current FetchTask associated with the Driver's plan, if any.
-   */
-  @Override
-  public FetchTask getFetchTask() {
-    return driverContext.getFetchTask();
+  private void runPreDriverHooks(HiveDriverRunHookContext hookContext) throws 
CommandProcessorException {
+    try {
+      driverContext.getHookRunner().runPreDriverHooks(hookContext);
+    } catch (Exception e) {
+      String errorMessage = "FAILED: Hive Internal Error: " + 
Utilities.getNameMessage(e);
+      CONSOLE.printError(errorMessage + "\n" + 
StringUtils.stringifyException(e));
+      throw DriverUtils.createProcessorException(driverContext, 12, 
errorMessage,
+          ErrorMsg.findSQLState(e.getMessage()), e);
+    }
   }
 
-  public void releaseLocksAndCommitOrRollback(boolean commit) throws 
LockException {
-    releaseLocksAndCommitOrRollback(commit, driverContext.getTxnManager());
-  }
+  public void lockAndRespond() throws CommandProcessorException {
+    // Assumes the query has already been compiled
+    if (driverContext.getPlan() == null) {
+      throw new IllegalStateException(
+          "No previously compiled query for driver - queryId=" + 
driverContext.getQueryState().getQueryId());
+    }
 
-  /**
-   * @param commit if there is an open transaction and if true, commit,
-   *               if false rollback.  If there is no open transaction this 
parameter is ignored.
-   * @param txnManager an optional existing transaction manager retrieved 
earlier from the session
-   *
-   **/
-  @VisibleForTesting
-  public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager 
txnManager) throws LockException {
-    driverTxnHandler.releaseLocksAndCommitOrRollback(commit, txnManager);
+    try {
+      driverTxnHandler.acquireLocksIfNeeded();
+    } catch (CommandProcessorException cpe) {
+      driverTxnHandler.rollback(cpe);
+      throw cpe;
+    }
   }
 
-  /**
-   * Release some resources after a query is executed
-   * while keeping the result around.
-   */
-  public void releaseResources() {
-    releasePlan();
-    releaseTaskQueue();
+  private void validateTxnListState() throws CommandProcessorException {
+    try {
+      if (!driverTxnHandler.isValidTxnListState()) {
+        LOG.warn("Reexecuting after acquiring locks, since snapshot was 
outdated.");
+        // Snapshot was outdated when locks were acquired, hence regenerate 
context,
+        // txn list and retry (see ReExecutionRetryLockPlugin)
+        try {
+          driverTxnHandler.releaseLocksAndCommitOrRollback(false);

Review comment:
       This will collide @mustafaiman's concurrent pull request. You might have 
to rebase this change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to