Repository: hive Updated Branches: refs/heads/branch-1 51f6e654a -> f1c75b5f0
HIVE-13013 - Further Improve concurrency in TxnHandler (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f1c75b5f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f1c75b5f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f1c75b5f Branch: refs/heads/branch-1 Commit: f1c75b5f01f02cd7490abc279d8110ea144c0a6d Parents: 51f6e65 Author: Eugene Koifman <[email protected]> Authored: Mon Feb 29 11:33:34 2016 -0800 Committer: Eugene Koifman <[email protected]> Committed: Mon Feb 29 11:33:34 2016 -0800 ---------------------------------------------------------------------- .../metastore/txn/CompactionTxnHandler.java | 2 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 620 +++++++++++-------- .../metastore/txn/TestTxnHandlerNegative.java | 9 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 2 +- 4 files changed, 355 insertions(+), 278 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f1c75b5f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index cf14b4e..4d736b9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -840,7 +840,7 @@ public class CompactionTxnHandler extends TxnHandler { * but what abount markCleaned() which is called when table is had been deleted... */ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw - //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure + //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure try { Connection dbConn = null; Statement stmt = null; http://git-wip-us.apache.org/repos/asf/hive/blob/f1c75b5f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2411c3e..fd66415 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -32,8 +32,8 @@ import org.apache.commons.dbcp.PoolingDataSource; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.shims.ShimLoader; @@ -45,6 +45,7 @@ import java.io.IOException; import java.sql.*; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * A handler to answer transaction related calls that come into the metastore @@ -60,6 +61,28 @@ import java.util.concurrent.TimeUnit; * For locks that are part of transaction, we set this 0 (would rather set it to NULL but * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding * transaction in TXNS. + * + * In general there can be multiple metastores where this logic can execute, thus the DB is + * used to ensure proper mutexing of operations. + * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is + * used to properly sequence operations. Most notably: + * 1. various sequence IDs are generated with aid of this mutex + * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state + * includes it's actual state (Open, Aborted) as well as it's lock list/component list. Thus all + * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. + * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. + * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock + * can be granted, no other (strictly speaking "earlier") lock can change state. + * + * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded + * (this is the only supported configuration for Derby) + * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. + * + * {@link #derbyLock} + + * If we ever decide to run remote Derby server, according to + * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be + * seriazlied, so that would also work though has not been tested. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -102,8 +125,6 @@ public class TxnHandler { static private DataSource connPool; static private boolean doRetryOnConnPool = false; - private final static Object lockLock = new Object(); // Random object to lock on for the lock - // method /** * Number of consecutive deadlocks we have seen @@ -120,11 +141,11 @@ public class TxnHandler { private final long retryInterval; private final int retryLimit; private int retryNum; + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); - // DEADLOCK DETECTION AND HANDLING - // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock - // between simultaneous accesses. ALWAYS access TXN_COMPONENTS before HIVE_LOCKS . - // // Private methods should never catch SQLException and then throw MetaException. The public // methods depend on SQLException coming back so they can detect and handle deadlocks. Private // methods should only throw MetaException when they explicitly know there's a logic error and @@ -139,15 +160,21 @@ public class TxnHandler { this.conf = conf; checkQFileTestHack(); - + + Connection dbConn = null; // Set up the JDBC connection pool try { setupJdbcConnectionPool(conf); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); } catch (SQLException e) { String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); LOG.error(msg); throw new RuntimeException(e); } + finally { + closeDbConn(dbConn); + } timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); buildJumpTable(); @@ -312,11 +339,12 @@ public class TxnHandler { Statement stmt = null; ResultSet rs = null; try { + lockInternal(); /** * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running, thus either - * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents + * Also, advancing the counter must work when multiple metastores are running. + * SELECT ... FOR UPDATE is used to prevent * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. * * In the current design, there can be several metastore instances running in a given Warehouse. @@ -329,14 +357,14 @@ public class TxnHandler { * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This * set could support a write-through cache for added performance. */ - dbConn = getDbConn(getRequiredIsolationLevel()); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); - String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID"); + String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -372,6 +400,7 @@ public class TxnHandler { + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); + unlockInternal(); } } catch (RetryException e) { return openTxns(rqst); @@ -383,7 +412,8 @@ public class TxnHandler { try { Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -400,6 +430,7 @@ public class TxnHandler { + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + unlockInternal(); } } catch (RetryException e) { abortTxn(rqst); @@ -412,33 +443,24 @@ public class TxnHandler { try { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { + lockInternal(); /** - * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS) - * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not - * normally run concurrently (for same txn) but could due to bugs in the client which could then - * (w/o SERIALIZABLE) corrupt internal transaction manager state. Also competes with abortTxn() - * - * Sketch of an improvement: - * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state. - * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like - * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would - * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the - * entry for this txn in TXNS in 'c' acts like a monitor. - * Since the move to 'c' state is in one txn (to make it visible) and the rest of the - * operations in another (could even be made separate txns), there is a possibility of failure - * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns. - * - * Or perhaps Select * TXNS where txn_id = " + txnid; for update + * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other + * operation can change this txn (such acquiring locks). While lock() and commitTxn() + * should not normally run concurrently (for same txn) but could due to bugs in the client + * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to - // commit it, but it does two things. One, it makes sure the transaction is still valid. - // Two, it avoids the race condition where we time out between now and when we actually - // commit the transaction below. And it does this all in a dead-lock safe way by - // committing the heartbeat back to the database. - heartbeatTxn(dbConn, txnid); + + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + //this also ensures that txn is still there and in expected state (hasn't been timed out) + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. @@ -471,35 +493,213 @@ public class TxnHandler { throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(lockHandle, stmt, dbConn); + unlockInternal(); } } catch (RetryException e) { commitTxn(rqst); } } - public LockResponse lock(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + /** + * As much as possible (i.e. in absence of retries) we want both operations to be done on the same + * connection (but separate transactions). This avoid some flakiness in BONECP where if you + * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one + * doesn't see results of the first. + */ + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); + try { + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + } + catch(NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); + } + } + private static final class ConnectionLockIdPair { + private final Connection dbConn; + private final long extLockId; + private ConnectionLockIdPair(Connection dbConn, long extLockId) { + this.dbConn = dbConn; + this.extLockId = extLockId; + } + } + + /** + * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read + * and then executeUpdate(). One other alternative would be to actually update the row in TXNX but + * to the same value as before thus forcing db to acquire write lock for duration of the transaction. + * + * There is no real reason to return the ResultSet here other than to make sure the reference to it + * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock + * to be released. + * @param txnState the state this txn is expected to be in. may be null + * @return null if no row was found + * @throws SQLException + * @throws MetaException + */ + private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : ""); + ResultSet rs = stmt.executeQuery(addForUpdateClause(query)); + if(rs.next()) { + return rs; + } + close(rs); + return null; + } + + /** + * This enters locks into the queue in {@link #LOCK_WAITING} mode. + * + * Isolation Level Notes: + * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes + * any 2 {@code enqueueLockWithRetry()} calls. + * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations + * @see #checkLockWithRetry(Connection, long, long) + */ + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + boolean success = false; + Connection dbConn = null; + try { + Statement stmt = null; + ResultSet rs = null; + ResultSet lockHandle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long txnid = rqst.getTxnid(); + stmt = dbConn.createStatement(); + if (isValidTxn(txnid)) { + //this also ensures that txn is still there in expected state + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + } + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + if (txnid > 0) { + // For each component in this lock request, + // add an entry to the txn_components table + // This must be done before HIVE_LOCKS is accessed + for (LockComponent lc : rqst.getComponent()) { + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + s = "insert into TXN_COMPONENTS " + + "(tc_txnid, tc_database, tc_table, tc_partition) " + + "values (" + txnid + ", '" + dbName + "', " + + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'") + ")"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } + + long intLockId = 0; + for (LockComponent lc : rqst.getComponent()) { + intLockId++; + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + LockType lockType = lc.getType(); + char lockChar = 'z'; + switch (lockType) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + long now = getDbTime(dbConn); + s = "insert into HIVE_LOCKS " + + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + + " values (" + extLockId + ", " + + +intLockId + "," + txnid + ", '" + + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + + ", " + (partName == null ? "null" : "'" + partName + "'") + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + (isValidTxn(txnid) ? 0 : now) + ", '" + + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + dbConn.commit(); + success = true; + return new ConnectionLockIdPair(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle); + close(rs, stmt, null); + if (!success) { + /* This needs to return a "live" connection to be used by operation that follows it. + Thus it only closes Connection on failure/retry. */ + closeDbConn(dbConn); + } + unlockInternal(); + } + } + catch(RetryException e) { + return enqueueLockWithRetry(rqst); + } + } + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - return lock(dbConn, rqst); + lockInternal(); + if(dbConn.isClosed()) { + //should only get here if retrying this op + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + } + dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "lock(" + rqst + ")"); + checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + unlockInternal(); closeDbConn(dbConn); } - } catch (RetryException e) { - return lock(rqst); + } + catch(RetryException e) { + return checkLockWithRetry(dbConn, extLockId, txnId); } } - /** * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. @@ -521,6 +721,7 @@ public class TxnHandler { Connection dbConn = null; long extLockId = rqst.getLockid(); try { + lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks @@ -535,6 +736,9 @@ public class TxnHandler { else { heartbeatLock(dbConn, extLockId); } + //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and + //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired + //extra heartbeat is logically harmless, but ... dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId); } catch (SQLException e) { @@ -545,6 +749,7 @@ public class TxnHandler { JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + unlockInternal(); } } catch (RetryException e) { return checkLock(rqst); @@ -555,6 +760,9 @@ public class TxnHandler { /** * This would have been made simpler if all locks were associated with a txn. Then only txn needs to * be heartbeated, committed, etc. no need for client to track individual locks. + * When removing locks not associated with txn this potentially conflicts with + * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. + * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { @@ -579,6 +787,8 @@ public class TxnHandler { //hl_txnid <> 0 means it's associated with a transaction String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; + //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where + //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -741,8 +951,9 @@ public class TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { - //todo: this is expensive call: at least 2 update queries per txn - //is this really worth it? + //todo: do all updates in 1 SQL statement and check update count + //if update count is less than was requested, go into more expensive checks + //for each txn heartbeatTxn(dbConn, txn); } catch (NoSuchTxnException e) { nosuch.add(txn); @@ -771,11 +982,12 @@ public class TxnHandler { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(getRequiredIsolationLevel()); + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -842,6 +1054,7 @@ public class TxnHandler { } finally { closeStmt(stmt); closeDbConn(dbConn); + unlockInternal(); } } catch (RetryException e) { return compact(rqst); @@ -915,16 +1128,25 @@ public class TxnHandler { } } + private static void shouldNeverHappen(long txnid) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); + } public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { try { + lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - // Heartbeat this first to make sure the transaction is still valid. - heartbeatTxn(dbConn, rqst.getTxnid()); + lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if(lockHandle == null) { + //ensures txn is still there and in expected state + ensureValidTxn(dbConn, rqst.getTxnid(), stmt); + shouldNeverHappen(rqst.getTxnid()); + } for (String partName : rqst.getPartitionnames()) { StringBuilder buff = new StringBuilder(); buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values ("); @@ -949,8 +1171,8 @@ public class TxnHandler { throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(lockHandle, stmt, dbConn); + unlockInternal(); } } catch (RetryException e) { addDynamicPartitions(rqst); @@ -996,13 +1218,15 @@ public class TxnHandler { protected Connection getDbConn(int isolationLevel) throws SQLException { int rc = doRetryOnConnPool ? 10 : 1; + Connection dbConn = null; while (true) { try { - Connection dbConn = connPool.getConnection(); + dbConn = connPool.getConnection(); dbConn.setAutoCommit(false); dbConn.setTransactionIsolation(isolationLevel); return dbConn; } catch (SQLException e){ + closeDbConn(dbConn); if ((--rc) <= 0) throw e; LOG.error("There is a problem with a connection from the pool, retrying(rc=" + rc + "): " + getMessage(e), e); @@ -1019,7 +1243,9 @@ public class TxnHandler { } protected void closeDbConn(Connection dbConn) { try { - if (dbConn != null && !dbConn.isClosed()) dbConn.close(); + if (dbConn != null && !dbConn.isClosed()) { + dbConn.close(); + } } catch (SQLException e) { LOG.warn("Failed to close db connection " + getMessage(e)); } @@ -1083,8 +1309,8 @@ public class TxnHandler { // Derby and newer MySQL driver use the new SQLTransactionRollbackException boolean sendRetrySignal = false; try { - if (dbProduct == null && conn != null) { - determineDatabaseProduct(conn); + if(dbProduct == null) { + throw new IllegalStateException("DB Type not determined yet."); } if (e instanceof SQLTransactionRollbackException || ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || @@ -1150,8 +1376,7 @@ public class TxnHandler { try { stmt = conn.createStatement(); String s; - DatabaseProduct prod = determineDatabaseProduct(conn); - switch (prod) { + switch (dbProduct) { case DERBY: s = "values current_timestamp"; break; @@ -1167,7 +1392,7 @@ public class TxnHandler { break; default: - String msg = "Unknown database product: " + prod.toString(); + String msg = "Unknown database product: " + dbProduct.toString(); LOG.error(msg); throw new MetaException(msg); } @@ -1203,16 +1428,15 @@ public class TxnHandler { * Determine the database product type * @param conn database connection * @return database product type - * @throws MetaException if the type cannot be determined or is unknown */ - protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaException { + private DatabaseProduct determineDatabaseProduct(Connection conn) { if (dbProduct == null) { - try {//todo: make this work when conn == null + try { String s = conn.getMetaData().getDatabaseProductName(); if (s == null) { String msg = "getDatabaseProductName returns null, can't determine database product"; LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } else if (s.equals("Apache Derby")) { dbProduct = DatabaseProduct.DERBY; } else if (s.equals("Microsoft SQL Server")) { @@ -1226,13 +1450,13 @@ public class TxnHandler { } else { String msg = "Unrecognized database product name <" + s + ">"; LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } } catch (SQLException e) { String msg = "Unable to get database product name: " + e.getMessage(); LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } } return dbProduct; @@ -1404,7 +1628,7 @@ public class TxnHandler { // We may have already created the tables and thus don't need to redo it. if (!e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage()); + " testing: " + e.getMessage(), e); } } } @@ -1416,6 +1640,9 @@ public class TxnHandler { /** * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining * trasnactions in a batch on IOExceptions. + * Caller must rollback the transaction if not all transactions were aborted since this will not + * attempt to delete associated locks in this case. + * * @param dbConn An active connection * @param txnids list of transactions to abort * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were @@ -1429,17 +1656,12 @@ public class TxnHandler { if (txnids.isEmpty()) { return 0; } - if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) { - /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently - * which would cause them to become orphaned. - */ - throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation()); - } try { stmt = dbConn.createStatement(); - - // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS - StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); + //This is an update statement, thus at any Isolation level will take Write locks so will block + //all other ops using S4U on TXNS row. + StringBuilder buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + + "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); boolean first = true; for (Long id : txnids) { if (first) first = false; @@ -1447,13 +1669,22 @@ public class TxnHandler { buf.append(id); } buf.append(')'); + if(max_heartbeat > 0) { + buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); + } LOG.debug("Going to execute update <" + buf.toString() + ">"); - stmt.executeUpdate(buf.toString()); + updateCnt = stmt.executeUpdate(buf.toString()); + if(updateCnt < txnids.size()) { + /** + * have to bail in this case since we don't know which transactions were not Aborted and + * thus don't know which locks to delete + * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and + * {@link #performTimeOuts()} + */ + return updateCnt; + } - //todo: seems like we should do this first and if it misses, don't bother with - //delete from HIVE_LOCKS since it will be rolled back - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + - "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); + buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); first = true; for (Long id : txnids) { if (first) first = false; @@ -1461,155 +1692,14 @@ public class TxnHandler { buf.append(id); } buf.append(')'); - if(max_heartbeat > 0) { - buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); - } LOG.debug("Going to execute update <" + buf.toString() + ">"); - updateCnt = stmt.executeUpdate(buf.toString()); - + stmt.executeUpdate(buf.toString()); } finally { closeStmt(stmt); } return updateCnt; } - /** - * Isolation Level Notes: - * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here. - * - * Ramblings: - * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC - * since this is just in Wait state. - * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of - * failure, the W locks will timeout if failure does not propagate to client in some way, or it - * will and client will Abort). - * Actually, whether we can do this depends on what happens when you try to get a lock and notice - * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you - * are checking new locks someone may insert new W locks that you don't see... - * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume - * that additional W locks will have higher IDs???? - * - * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn. - * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think, - * but it will not be updating the same row from 2 DB. - * - * Request a lock - * @param dbConn database connection - * @param rqst lock information - * @return information on whether the lock was acquired. - * @throws NoSuchTxnException - * @throws TxnAbortedException - */ - private LockResponse lock(Connection dbConn, LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { - // We want to minimize the number of concurrent lock requests being issued. If we do not we - // get a large number of deadlocks in the database, since this method has to both clean - // timedout locks and insert new locks. This synchronization barrier will not eliminate all - // deadlocks, and the code is still resilient in the face of a database deadlock. But it - // will reduce the number. This could have been done via a lock table command in the - // underlying database, but was not for two reasons. One, different databases have different - // syntax for lock table, making it harder to use. Two, that would lock the HIVE_LOCKS table - // and prevent other operations (such as committing transactions, showing locks, - // etc.) that should not interfere with this one. - synchronized (lockLock) { - Statement stmt = null; - ResultSet rs = null; - try { - long txnid = rqst.getTxnid(); - if (txnid > 0) { - // Heartbeat the transaction so we know it is valid and we avoid it timing out while we - // are locking. - heartbeatTxn(dbConn, txnid); - } - stmt = dbConn.createStatement(); - - /** Get the next lock id. - * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. - * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, - * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, - * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} - * doesn't block on locks acquired later than one it's checking*/ - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - if (txnid > 0) { - // For each component in this lock request, - // add an entry to the txn_components table - // This must be done before HIVE_LOCKS is accessed - - //Isolation note: - //the !wait option is not actually used anywhere. W/o that, - // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id - //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable. - // - // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long - //as check lock is at serializable (or any other way to make sure it's exclusive) - for (LockComponent lc : rqst.getComponent()) { - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition) " + - "values (" + txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'") + ")"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - } - - long intLockId = 0; - for (LockComponent lc : rqst.getComponent()) { - intLockId++; - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: lockChar = LOCK_EXCLUSIVE; break; - case SHARED_READ: lockChar = LOCK_SHARED; break; - case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; - } - long now = getDbTime(dbConn); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - + intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - /**to make txns shorter we could commit here and start a new txn for checkLock. This would - * require moving checkRetryable() down into here. Could we then run the part before this - * commit are READ_COMMITTED?*/ - return checkLock(dbConn, extLockId); - } catch (NoSuchLockException e) { - // This should never happen, as we just added the lock id - throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); - } finally { - close(rs); - closeStmt(stmt); - } - } - } private static boolean isValidTxn(long txnId) { return txnId != 0; } @@ -1624,12 +1714,17 @@ public class TxnHandler { private LockResponse checkLock(Connection dbConn, long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) { + //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations + //that would be less prone to deadlocks + throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation()); + } List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now LockResponse response = new LockResponse(); response.setLockid(extLockId); LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - Savepoint save = dbConn.setSavepoint(); + Savepoint save = dbConn.setSavepoint();//todo: get rid of this StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); @@ -2063,9 +2158,8 @@ public class TxnHandler { * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the * appropriately modified row limiting query. */ - private String addLimitClause(Connection dbConn, int numRows, String noSelectsqlQuery) throws MetaException { - DatabaseProduct prod = determineDatabaseProduct(dbConn); - switch (prod) { + private String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { + switch (dbProduct) { case DERBY: //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; @@ -2082,7 +2176,7 @@ public class TxnHandler { //https://msdn.microsoft.com/en-us/library/ms189463.aspx return "select TOP(" + numRows + ") " + noSelectsqlQuery; default: - String msg = "Unrecognized database product name <" + prod + ">"; + String msg = "Unrecognized database product name <" + dbProduct + ">"; LOG.error(msg); throw new MetaException(msg); } @@ -2116,7 +2210,7 @@ public class TxnHandler { stmt = dbConn.createStatement(); String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + "' and txn_last_heartbeat < " + (now - timeout); - s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); + s = addLimitClause(250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if(!rs.next()) { @@ -2133,8 +2227,7 @@ public class TxnHandler { } } while(rs.next()); dbConn.commit(); - close(rs, stmt, dbConn); - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + close(rs, stmt, null); int numTxnsAborted = 0; for(List<Long> batchToAbort : timedOutTxns) { if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) { @@ -2337,45 +2430,11 @@ public class TxnHandler { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } /** - * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc. - * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE - * functionality. Where that is not available, SERIALIZABLE isolation is used. - * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that - * if FOR UPDATE is not available, must run operation at SERIALIZABLE. - */ - private int getRequiredIsolationLevel() throws MetaException, SQLException { - if(dbProduct == null) { - Connection tmp = null; - try { - tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - determineDatabaseProduct(tmp); - } - finally { - closeDbConn(tmp); - } - } - switch (dbProduct) { - case DERBY: - return Connection.TRANSACTION_SERIALIZABLE; - case MYSQL: - case ORACLE: - case POSTGRES: - case SQLSERVER: - return Connection.TRANSACTION_READ_COMMITTED; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } - /** * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent - * construct. If the DB doesn't support, return original select. This method must always - * agree with {@link #getRequiredIsolationLevel()} + * construct. If the DB doesn't support, return original select. */ - private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException { - DatabaseProduct prod = determineDatabaseProduct(dbConn); - switch (prod) { + private String addForUpdateClause(String selectStatement) throws MetaException { + switch (dbProduct) { case DERBY: //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html //sadly in Derby, FOR UPDATE doesn't meant what it should @@ -2392,7 +2451,7 @@ public class TxnHandler { //https://msdn.microsoft.com/en-us/library/ms187373.aspx return selectStatement + " with(updlock)"; default: - String msg = "Unrecognized database product name <" + prod + ">"; + String msg = "Unrecognized database product name <" + dbProduct + ">"; LOG.error(msg); throw new MetaException(msg); } @@ -2400,6 +2459,9 @@ public class TxnHandler { static String quoteString(String input) { return "'" + input + "'"; } + static String quoteChar(char c) { + return "'" + c + "'"; + } static CompactionType dbCompactionType2ThriftType(char dbValue) { switch (dbValue) { case MAJOR_TYPE: @@ -2422,4 +2484,20 @@ public class TxnHandler { return null; } } + + /** + * {@link #lockInternal()} and {@link #unlockInternal()} are used to serialize those operations that require + * Select ... For Update to sequence operations properly. In practice that means when running + * with Derby database. See more notes at class level. + */ + private void lockInternal() { + if(dbProduct == DatabaseProduct.DERBY) { + derbyLock.lock(); + } + } + private void unlockInternal() { + if(dbProduct == DatabaseProduct.DERBY) { + derbyLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f1c75b5f/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java index abceaf3..abe1e37 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java @@ -35,15 +35,14 @@ public class TestTxnHandlerNegative { public void testBadConnection() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah"); - TxnHandler txnHandler1 = new TxnHandler(conf); - MetaException e = null; + RuntimeException e = null; try { - txnHandler1.getOpenTxns(); + TxnHandler txnHandler1 = new TxnHandler(conf); } - catch(MetaException ex) { + catch(RuntimeException ex) { LOG.info("Expected error: " + ex.getMessage(), ex); e = ex; } - assert e != null : "did not get exception"; + assert e != null && e.getMessage().contains("No suitable driver found for blah") : "did not get exception"; } } http://git-wip-us.apache.org/repos/asf/hive/blob/f1c75b5f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 55ea009..a9867ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -406,7 +406,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { private void stopHeartbeat() { if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { - heartbeatTask.cancel(true); + heartbeatTask.cancel(false); heartbeatTask = null; } }
