This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 509f9e0fec HIVE-26121: Hive transaction rollback should be thread-safe
(Denys Kuzmenko, reviewed by Peter Vary)
509f9e0fec is described below
commit 509f9e0fec5266e75b924c270bf78116b504275e
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Apr 13 11:33:13 2022 +0200
HIVE-26121: Hive transaction rollback should be thread-safe (Denys
Kuzmenko, reviewed by Peter Vary)
Closes #3181
---
.../hive/jdbc/miniHS2/TestHiveServer2Acid.java | 43 +++++++-
.../org/apache/hadoop/hive/ql/DriverState.java | 4 +-
.../apache/hadoop/hive/ql/DriverTxnHandler.java | 9 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 118 ++++++++-------------
4 files changed, 93 insertions(+), 81 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
index ea2cae5822..062de96307 100644
---
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestHiveServer2Acid.java
@@ -30,6 +30,7 @@ import org.apache.hive.service.cli.OperationState;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.SessionHandle;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -72,6 +73,32 @@ public class TestHiveServer2Acid {
}
}
+ @Test
+ public void testCancelOperation() throws Exception {
+ String tableName = "TestHiveServer2TestConnection";
+ CLIServiceClient serviceClient = miniHS2.getServiceClient();
+ SessionHandle sessHandle = serviceClient.openSession("foo", "bar");
+ serviceClient.executeStatement(sessHandle, "DROP TABLE IF EXISTS " +
tableName, confOverlay);
+ serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + "
(id INT)", confOverlay);
+ serviceClient.executeStatement(sessHandle, "insert into " + tableName + "
values(5)", confOverlay);
+
+ serviceClient.executeStatement(sessHandle,
+ "create temporary function sleepMsUDF as '" +
TestHiveServer2Acid.SleepMsUDF.class.getName() + "'",
+ confOverlay);
+ OperationHandle opHandle = serviceClient
+ .executeStatementAsync(sessHandle, "select sleepMsUDF(id, 1000), id
from " + tableName, confOverlay);
+ serviceClient.cancelOperation(opHandle);
+
+ assertOperationWasCancelled(serviceClient, opHandle);
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(
+ miniHS2.getHiveConf(),
+ "select count(*) from HIVE_LOCKS"));
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(
+ miniHS2.getHiveConf(),
+ "select 1 from TXNS where TXN_ID = (select max(TXN_ID) from TXNS) and
TXN_STATE='a'"));
+ serviceClient.closeSession(sessHandle);
+ }
+
/**
* Test overlapping async queries in one session.
* Since TxnManager is shared in the session this can cause all kind of
trouble.
@@ -105,16 +132,26 @@ public class TestHiveServer2Acid {
serviceClient.closeSession(sessHandle);
}
- private void assertOperationFinished(CLIServiceClient serviceClient,
OperationHandle opHandle)
+ private void assertOperationFinished(CLIServiceClient serviceClient,
OperationHandle opHandle)
+ throws HiveSQLException, InterruptedException {
+ assertOperationStatus(serviceClient, opHandle, OperationState.FINISHED);
+ }
+
+ private void assertOperationWasCancelled(CLIServiceClient serviceClient,
OperationHandle opHandle)
+ throws HiveSQLException, InterruptedException {
+ assertOperationStatus(serviceClient, opHandle, OperationState.CANCELED);
+ }
+
+ private void assertOperationStatus(CLIServiceClient serviceClient,
OperationHandle opHandle, OperationState opState)
throws InterruptedException, HiveSQLException {
OperationState pStatus = OperationState.RUNNING;
for (int i = 0; i < 10; i++) {
Thread.sleep(100);
pStatus = serviceClient.getOperationStatus(opHandle, false).getState();
- if (pStatus == OperationState.FINISHED) {
+ if (pStatus == opState) {
break;
}
}
- assertEquals(OperationState.FINISHED, pStatus);
+ assertEquals(opState, pStatus);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
index 6e0355d85e..72f9c2e606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverState.java
@@ -132,7 +132,9 @@ public class DriverState {
public void executionFinishedWithLocking(boolean wasError) {
lock();
try {
- driverState = wasError ? State.ERROR : State.EXECUTED;
+ if (!isDestroyed()) {
+ driverState = wasError ? State.ERROR : State.EXECUTED;
+ }
} finally {
unlock();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
index 09fc767e20..4c925ad9e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
@@ -570,7 +570,10 @@ class DriverTxnHandler {
txnRollbackRunner = null;
}
- void endTransactionAndCleanup(boolean commit, HiveTxnManager txnManager)
throws LockException {
+ // When Hive query is being interrupted via cancel request, both the
background pool thread (HiveServer2-Background),
+ // executing the query, and the HttpHandler thread (HiveServer2-Handler),
running the HiveSession.cancelOperation logic,
+ // might call the below method concurrently. To prevent a race condition,
marking it as synchronized.
+ synchronized void endTransactionAndCleanup(boolean commit, HiveTxnManager
txnManager) throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
@@ -611,6 +614,8 @@ class DriverTxnHandler {
if (context != null && context.getHiveLocks() != null) {
hiveLocks.addAll(context.getHiveLocks());
}
- txnManager.releaseLocks(hiveLocks);
+ if (!hiveLocks.isEmpty()) {
+ txnManager.releaseLocks(hiveLocks);
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 300b52b4a8..fd1b508fd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -60,6 +60,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -76,7 +77,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
/**
* An implementation of HiveTxnManager that stores the transactions in the
metastore database.
@@ -92,6 +92,7 @@ import java.util.concurrent.locks.ReentrantLock;
* implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks
exactly as autoCommit=true
* from end user poit of view). See more at {@link #isExplicitTransaction}.
*/
+@NotThreadSafe
public final class DbTxnManager extends HiveTxnManagerImpl {
static final private String CLASS_NAME = DbTxnManager.class.getName();
@@ -169,7 +170,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
private static ScheduledExecutorService heartbeatExecutorService = null;
private ScheduledFuture<?> heartbeatTask = null;
private static final int SHUTDOWN_HOOK_PRIORITY = 0;
- private final ReentrantLock heartbeatTaskLock = new ReentrantLock();
//Contains database under replication name for hive replication transactions
(dump and load operation)
private String replPolicy;
@@ -267,7 +267,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* be read by a different threads than one writing it, thus it's {@code
volatile}
*/
@Override
- public HiveLockManager getLockManager() throws LockException {
+ public HiveLockManager getLockManager() {
init();
if (lockMgr == null) {
lockMgr = new DbLockManager(conf, this);
@@ -470,16 +470,15 @@ public final class DbTxnManager extends
HiveTxnManagerImpl {
}
}
-
@Override
- public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+ public void releaseLocks(List<HiveLock> hiveLocks) {
if (lockMgr != null) {
stopHeartbeat();
lockMgr.releaseLocks(hiveLocks);
}
}
- private void clearLocksAndHB() throws LockException {
+ private void clearLocksAndHB() {
lockMgr.clearLocalLockRecords();
stopHeartbeat();
}
@@ -574,30 +573,24 @@ public final class DbTxnManager extends
HiveTxnManagerImpl {
if (!isTxnOpen()) {
throw new RuntimeException("Attempt to rollback before opening a
transaction");
}
- stopHeartbeat();
-
try {
- lockMgr.clearLocalLockRecords();
+ clearLocksAndHB();
LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
-
- // Re-checking as txn could have been closed, in the meantime, by a
competing thread.
- if (isTxnOpen()) {
- if (replPolicy != null) {
- getMS().replRollbackTxn(txnId, replPolicy, TxnType.DEFAULT);
- } else {
- getMS().rollbackTxn(txnId);
- }
+
+ if (replPolicy != null) {
+ getMS().replRollbackTxn(txnId, replPolicy, TxnType.DEFAULT);
} else {
- LOG.warn("Transaction is already closed.");
+ getMS().rollbackTxn(txnId);
}
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION,
JavaUtils.txnIdToString(txnId));
+
} catch(TxnAbortedException e) {
throw new LockException(e, ErrorMsg.TXN_ABORTED,
JavaUtils.txnIdToString(txnId));
+
} catch (TException e) {
- throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
- e);
+ throw new
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
} finally {
resetTxnInfo();
}
@@ -678,21 +671,11 @@ public final class DbTxnManager extends
HiveTxnManagerImpl {
throw new LockException("error while getting current user,", e);
}
- try {
- heartbeatTaskLock.lock();
- if (heartbeatTask != null) {
- throw new IllegalStateException("Heartbeater is already started.");
- }
-
- Heartbeater heartbeater = new Heartbeater(this, conf, queryId,
currentUser);
- heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval,
heartbeater);
- LOG.debug("Started heartbeat with delay/interval = " + initialDelay +
"/" + heartbeatInterval +
- " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
-
- return heartbeater;
- } finally {
- heartbeatTaskLock.unlock();
- }
+ Heartbeater heartbeater = new Heartbeater(this, conf, queryId,
currentUser);
+ heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval,
heartbeater);
+ LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/"
+ heartbeatInterval +
+ " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
+ return heartbeater;
}
private ScheduledFuture<?> startHeartbeat(long initialDelay, long
heartbeatInterval, Runnable heartbeater) {
@@ -710,49 +693,33 @@ public final class DbTxnManager extends
HiveTxnManagerImpl {
return task;
}
- private void stopHeartbeat() {
- if (heartbeatTask == null) {
- // avoid unnecessary locking if the field is null
- return;
- }
-
- boolean isLockAcquired = false;
- try {
- // The lock should not be held by other thread trying to stop the
heartbeat for more than 31 seconds
- isLockAcquired = heartbeatTaskLock.tryLock(31000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // safe to go on
- }
-
- try {
- if (isLockAcquired && heartbeatTask != null) {
- heartbeatTask.cancel(true);
- long startTime = System.currentTimeMillis();
- long sleepInterval = 100;
- while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
- // We will wait for 30 seconds for the task to be cancelled.
- // If it's still not cancelled (unlikely), we will just move on.
- long now = System.currentTimeMillis();
- if (now - startTime > 30000) {
- LOG.warn("Heartbeat task cannot be cancelled for unknown reason.
QueryId: " + queryId);
- break;
- }
- try {
- Thread.sleep(sleepInterval);
- } catch (InterruptedException e) {
- }
- sleepInterval *= 2;
+ // To prevent NullPointerException due to race condition, marking it as
synchronized.
+ private synchronized void stopHeartbeat() {
+ if (heartbeatTask != null) {
+ heartbeatTask.cancel(true);
+
+ long startTime = System.currentTimeMillis();
+ long sleepInterval = 100;
+
+ while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+ // We will wait for 30 seconds for the task to be cancelled.
+ // If it's still not cancelled (unlikely), we will just move on.
+ long now = System.currentTimeMillis();
+ if (now - startTime > 30000) {
+ LOG.error("Heartbeat task cannot be cancelled for unknown reason.
QueryId: " + queryId);
+ break;
}
- if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
- LOG.info("Stopped heartbeat for query: " + queryId);
+ try {
+ Thread.sleep(sleepInterval);
+ } catch (InterruptedException e) {
}
- heartbeatTask = null;
- queryId = null;
+ sleepInterval *= 2;
}
- } finally {
- if (isLockAcquired) {
- heartbeatTaskLock.unlock();
+ if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+ LOG.info("Stopped heartbeat for query: " + queryId);
}
+ heartbeatTask = null;
+ queryId = null;
}
}
@@ -886,6 +853,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
return false;
}
+
@Override
protected void destruct() {
try {
@@ -903,7 +871,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
}
- private void init() throws LockException {
+ private void init() {
if (conf == null) {
throw new RuntimeException("Must call setHiveConf before any other
methods.");
}