This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 509f9e0fec HIVE-26121: Hive transaction rollback should be thread-safe 
(Denys Kuzmenko, reviewed by Peter Vary)
509f9e0fec is described below

commit 509f9e0fec5266e75b924c270bf78116b504275e
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Apr 13 11:33:13 2022 +0200

    HIVE-26121: Hive transaction rollback should be thread-safe (Denys 
Kuzmenko, reviewed by Peter Vary)
    
    Closes #3181
---
 .../hive/jdbc/miniHS2/TestHiveServer2Acid.java     |  43 +++++++-
 .../org/apache/hadoop/hive/ql/DriverState.java     |   4 +-
 .../apache/hadoop/hive/ql/DriverTxnHandler.java    |   9 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       | 118 ++++++++-------------
 4 files changed, 93 insertions(+), 81 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
index ea2cae5822..062de96307 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
@@ -30,6 +30,7 @@ import org.apache.hive.service.cli.OperationState;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.SessionHandle;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -72,6 +73,32 @@ public class TestHiveServer2Acid {
     }
   }
 
+  @Test
+  public void testCancelOperation() throws Exception {
+    String tableName = "TestHiveServer2TestConnection";
+    CLIServiceClient serviceClient = miniHS2.getServiceClient();
+    SessionHandle sessHandle = serviceClient.openSession("foo", "bar");
+    serviceClient.executeStatement(sessHandle, "DROP TABLE IF EXISTS " + 
tableName, confOverlay);
+    serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " 
(id INT)", confOverlay);
+    serviceClient.executeStatement(sessHandle, "insert into " + tableName + " 
values(5)", confOverlay);
+
+    serviceClient.executeStatement(sessHandle,
+        "create temporary function sleepMsUDF as '" + 
TestHiveServer2Acid.SleepMsUDF.class.getName() + "'",
+        confOverlay);
+    OperationHandle opHandle = serviceClient
+        .executeStatementAsync(sessHandle, "select sleepMsUDF(id, 1000), id 
from " + tableName, confOverlay);
+    serviceClient.cancelOperation(opHandle);
+    
+    assertOperationWasCancelled(serviceClient, opHandle);
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(
+        miniHS2.getHiveConf(), 
+        "select count(*) from HIVE_LOCKS"));
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(
+        miniHS2.getHiveConf(), 
+        "select 1 from TXNS where TXN_ID = (select max(TXN_ID) from TXNS) and 
TXN_STATE='a'"));
+    serviceClient.closeSession(sessHandle);
+  }
+
   /**
    * Test overlapping async queries in one session.
    * Since TxnManager is shared in the session this can cause all kind of 
trouble.
@@ -105,16 +132,26 @@ public class TestHiveServer2Acid {
     serviceClient.closeSession(sessHandle);
   }
 
-  private void assertOperationFinished(CLIServiceClient serviceClient, 
OperationHandle opHandle)
+  private void assertOperationFinished(CLIServiceClient serviceClient, 
OperationHandle opHandle) 
+      throws HiveSQLException, InterruptedException {
+    assertOperationStatus(serviceClient, opHandle, OperationState.FINISHED);
+  }
+
+  private void assertOperationWasCancelled(CLIServiceClient serviceClient, 
OperationHandle opHandle)
+    throws HiveSQLException, InterruptedException {
+    assertOperationStatus(serviceClient, opHandle, OperationState.CANCELED);
+  }
+  
+  private void assertOperationStatus(CLIServiceClient serviceClient, 
OperationHandle opHandle, OperationState opState)
       throws InterruptedException, HiveSQLException {
     OperationState pStatus = OperationState.RUNNING;
     for (int i = 0; i < 10; i++) {
       Thread.sleep(100);
       pStatus = serviceClient.getOperationStatus(opHandle, false).getState();
-      if (pStatus == OperationState.FINISHED) {
+      if (pStatus == opState) {
         break;
       }
     }
-    assertEquals(OperationState.FINISHED, pStatus);
+    assertEquals(opState, pStatus);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
index 6e0355d85e..72f9c2e606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
@@ -132,7 +132,9 @@ public class DriverState {
   public void executionFinishedWithLocking(boolean wasError) {
     lock();
     try {
-      driverState = wasError ? State.ERROR : State.EXECUTED;
+      if (!isDestroyed()) {
+        driverState = wasError ? State.ERROR : State.EXECUTED;
+      }
     } finally {
       unlock();
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
index 09fc767e20..4c925ad9e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
@@ -570,7 +570,10 @@ class DriverTxnHandler {
     txnRollbackRunner = null;
   }
 
-  void endTransactionAndCleanup(boolean commit, HiveTxnManager txnManager) 
throws LockException {
+  // When Hive query is being interrupted via cancel request, both the 
background pool thread (HiveServer2-Background), 
+  // executing the query, and the HttpHandler thread (HiveServer2-Handler), 
running the HiveSession.cancelOperation logic, 
+  // might call the below method concurrently. To prevent a race condition, 
marking it as synchronized.
+  synchronized void endTransactionAndCleanup(boolean commit, HiveTxnManager 
txnManager) throws LockException {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
 
@@ -611,6 +614,8 @@ class DriverTxnHandler {
     if (context != null && context.getHiveLocks() != null) {
       hiveLocks.addAll(context.getHiveLocks());
     }
-    txnManager.releaseLocks(hiveLocks);
+    if (!hiveLocks.isEmpty()) {
+      txnManager.releaseLocks(hiveLocks);
+    }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 300b52b4a8..fd1b508fd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -60,6 +60,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -76,7 +77,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * An implementation of HiveTxnManager that stores the transactions in the 
metastore database.
@@ -92,6 +92,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks 
exactly as autoCommit=true
  * from end user poit of view). See more at {@link #isExplicitTransaction}.
  */
+@NotThreadSafe
 public final class DbTxnManager extends HiveTxnManagerImpl {
 
   static final private String CLASS_NAME = DbTxnManager.class.getName();
@@ -169,7 +170,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   private static ScheduledExecutorService heartbeatExecutorService = null;
   private ScheduledFuture<?> heartbeatTask = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
-  private final ReentrantLock heartbeatTaskLock = new ReentrantLock();
   //Contains database under replication name for hive replication transactions 
(dump and load operation)
   private String replPolicy;
 
@@ -267,7 +267,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * be read by a different threads than one writing it, thus it's {@code 
volatile}
    */
   @Override
-  public HiveLockManager getLockManager() throws LockException {
+  public HiveLockManager getLockManager() {
     init();
     if (lockMgr == null) {
       lockMgr = new DbLockManager(conf, this);
@@ -470,16 +470,15 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
     }
   }
 
-
   @Override
-  public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+  public void releaseLocks(List<HiveLock> hiveLocks) {
     if (lockMgr != null) {
       stopHeartbeat();
       lockMgr.releaseLocks(hiveLocks);
     }
   }
 
-  private void clearLocksAndHB() throws LockException {
+  private void clearLocksAndHB() {
     lockMgr.clearLocalLockRecords();
     stopHeartbeat();
   }
@@ -574,30 +573,24 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
     if (!isTxnOpen()) {
       throw new RuntimeException("Attempt to rollback before opening a 
transaction");
     }
-    stopHeartbeat();
-
     try {
-      lockMgr.clearLocalLockRecords();
+      clearLocksAndHB();
       LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
-
-      // Re-checking as txn could have been closed, in the meantime, by a 
competing thread.
-      if (isTxnOpen()) {
-        if (replPolicy != null) {
-          getMS().replRollbackTxn(txnId, replPolicy, TxnType.DEFAULT);
-        } else {
-          getMS().rollbackTxn(txnId);
-        }
+      
+      if (replPolicy != null) {
+        getMS().replRollbackTxn(txnId, replPolicy, TxnType.DEFAULT);
       } else {
-        LOG.warn("Transaction is already closed.");
+        getMS().rollbackTxn(txnId);
       }
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, 
JavaUtils.txnIdToString(txnId));
+    
     } catch(TxnAbortedException e) {
       throw new LockException(e, ErrorMsg.TXN_ABORTED, 
JavaUtils.txnIdToString(txnId));
+    
     } catch (TException e) {
-      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
-          e);
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     } finally {
       resetTxnInfo();
     }
@@ -678,21 +671,11 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
       throw new LockException("error while getting current user,", e);
     }
 
-    try {
-      heartbeatTaskLock.lock();
-      if (heartbeatTask != null) {
-        throw new IllegalStateException("Heartbeater is already started.");
-      }
-
-      Heartbeater heartbeater = new Heartbeater(this, conf, queryId, 
currentUser);
-      heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, 
heartbeater);
-      LOG.debug("Started heartbeat with delay/interval = " + initialDelay + 
"/" + heartbeatInterval +
-          " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
-
-      return heartbeater;
-    } finally {
-      heartbeatTaskLock.unlock();
-    }
+    Heartbeater heartbeater = new Heartbeater(this, conf, queryId, 
currentUser);
+    heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, 
heartbeater);
+    LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" 
+ heartbeatInterval +
+      " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
+    return heartbeater;
   }
 
   private ScheduledFuture<?> startHeartbeat(long initialDelay, long 
heartbeatInterval, Runnable heartbeater) {
@@ -710,49 +693,33 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
     return task;
   }
 
-  private void stopHeartbeat() {
-    if (heartbeatTask == null) {
-      // avoid unnecessary locking if the field is null
-      return;
-    }
-
-    boolean isLockAcquired = false;
-    try {
-      // The lock should not be held by other thread trying to stop the 
heartbeat for more than 31 seconds
-      isLockAcquired = heartbeatTaskLock.tryLock(31000, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      // safe to go on
-    }
-
-    try {
-      if (isLockAcquired && heartbeatTask != null) {
-        heartbeatTask.cancel(true);
-        long startTime = System.currentTimeMillis();
-        long sleepInterval = 100;
-        while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
-          // We will wait for 30 seconds for the task to be cancelled.
-          // If it's still not cancelled (unlikely), we will just move on.
-          long now = System.currentTimeMillis();
-          if (now - startTime > 30000) {
-            LOG.warn("Heartbeat task cannot be cancelled for unknown reason. 
QueryId: " + queryId);
-            break;
-          }
-          try {
-            Thread.sleep(sleepInterval);
-          } catch (InterruptedException e) {
-          }
-          sleepInterval *= 2;
+  // To prevent NullPointerException due to race condition, marking it as 
synchronized.
+  private synchronized void stopHeartbeat() {
+    if (heartbeatTask != null) {
+      heartbeatTask.cancel(true);
+      
+      long startTime = System.currentTimeMillis();
+      long sleepInterval = 100;
+      
+      while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+        // We will wait for 30 seconds for the task to be cancelled.
+        // If it's still not cancelled (unlikely), we will just move on.
+        long now = System.currentTimeMillis();
+        if (now - startTime > 30000) {
+          LOG.error("Heartbeat task cannot be cancelled for unknown reason. 
QueryId: " + queryId);
+          break;
         }
-        if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
-          LOG.info("Stopped heartbeat for query: " + queryId);
+        try {
+          Thread.sleep(sleepInterval);
+        } catch (InterruptedException e) {
         }
-        heartbeatTask = null;
-        queryId = null;
+        sleepInterval *= 2;
       }
-    } finally {
-      if (isLockAcquired) {
-        heartbeatTaskLock.unlock();
+      if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+        LOG.info("Stopped heartbeat for query: " + queryId);
       }
+      heartbeatTask = null;
+      queryId = null;
     }
   }
 
@@ -886,6 +853,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
     return false;
   }
+  
   @Override
   protected void destruct() {
     try {
@@ -903,7 +871,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
-  private void init() throws LockException {
+  private void init() {
     if (conf == null) {
       throw new RuntimeException("Must call setHiveConf before any other 
methods.");
     }

Reply via email to