Repository: hive
Updated Branches:
  refs/heads/master 69d7ff118 -> aaa356932


HIVE-13013 - Further Improve concurrency in TxnHandler (Eugene Koifman, 
reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aaa35693
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aaa35693
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aaa35693

Branch: refs/heads/master
Commit: aaa3569323b4c2718e8a2cf80accc70637a9121c
Parents: 69d7ff1
Author: Eugene Koifman <[email protected]>
Authored: Mon Feb 29 09:56:03 2016 -0800
Committer: Eugene Koifman <[email protected]>
Committed: Mon Feb 29 09:56:03 2016 -0800

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     |   2 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 645 ++++++++++---------
 .../metastore/txn/TestTxnHandlerNegative.java   |  10 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   2 +-
 4 files changed, 358 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aaa35693/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 70cbab7..da2b395 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -824,7 +824,7 @@ class CompactionTxnHandler extends TxnHandler {
    * but what abount markCleaned() which is called when table is had been 
deleted...
    */
   public void markFailed(CompactionInfo ci) throws MetaException {//todo: this 
should not throw
-    //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to 
provide some context for the failure
+    //todo: this should take "comment" as parameter to set in CC_META_INFO to 
provide some context for the failure
     try {
       Connection dbConn = null;
       Statement stmt = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/aaa35693/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 79c4f7a..d4d0162 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -32,8 +32,6 @@ import org.apache.commons.dbcp.PoolingDataSource;
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -45,6 +43,7 @@ import java.io.IOException;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * A handler to answer transaction related calls that come into the metastore
@@ -60,6 +59,28 @@ import java.util.concurrent.TimeUnit;
  * For locks that are part of transaction, we set this 0 (would rather set it 
to NULL but
  * Currently the DB schema has this NOT NULL) and only update/read heartbeat 
from corresponding
  * transaction in TXNS.
+ *
+ * In general there can be multiple metastores where this logic can execute, 
thus the DB is
+ * used to ensure proper mutexing of operations.
+ * Select ... For Update (or equivalent: either MsSql with(updlock) or actual 
Update stmt) is
+ * used to properly sequence operations.  Most notably:
+ * 1. various sequence IDs are generated with aid of this mutex
+ * 2. ensuring that each (Hive) Transaction state is transitioned atomically.  
Transaction state
+ *  includes it's actual state (Open, Aborted) as well as it's lock 
list/component list.  Thus all
+ *  per transaction ops, either start by update/delete of the relevant TXNS 
row or do S4U on that row.
+ *  This allows almost all operations to run at READ_COMMITTED and minimizes 
DB deadlocks.
+ * 3. checkLock() - this is mutexted entirely since we must ensure that while 
we check if some lock
+ *  can be granted, no other (strictly speaking "earlier") lock can change 
state.
+ *
+ * The exception to his is Derby which doesn't support proper S4U.  Derby is 
always running embedded
+ * (this is the only supported configuration for Derby)
+ * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to 
properly sequnce the operations.
+ *
+ * {@link #derbyLock}
+
+ * If we ever decide to run remote Derby server, according to
+ * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all 
transactions will be
+ * seriazlied, so that would also work though has not been tested.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -94,8 +115,6 @@ abstract class TxnHandler implements TxnStore {
 
   static private DataSource connPool;
   static private boolean doRetryOnConnPool = false;
-  private final static Object lockLock = new Object(); // Random object to 
lock on for the lock
-  // method
 
   /**
    * Number of consecutive deadlocks we have seen
@@ -112,11 +131,11 @@ abstract class TxnHandler implements TxnStore {
   private long retryInterval;
   private int retryLimit;
   private int retryNum;
+  /**
+   * Derby specific concurrency control
+   */
+  private static final ReentrantLock derbyLock = new ReentrantLock(true);
 
-  // DEADLOCK DETECTION AND HANDLING
-  // A note to developers of this class.  ALWAYS access HIVE_LOCKS before TXNS 
to avoid deadlock
-  // between simultaneous accesses.  ALWAYS access TXN_COMPONENTS before 
HIVE_LOCKS .
-  //
   // Private methods should never catch SQLException and then throw 
MetaException.  The public
   // methods depend on SQLException coming back so they can detect and handle 
deadlocks.  Private
   // methods should only throw MetaException when they explicitly know there's 
a logic error and
@@ -130,19 +149,29 @@ abstract class TxnHandler implements TxnStore {
   public TxnHandler() {
   }
 
+  /**
+   * This is logically part of c'tor and must be called prior to any other 
method.
+   * Not physically part of c'tor due to use of relfection
+   */
   public void setConf(HiveConf conf) {
     this.conf = conf;
 
     checkQFileTestHack();
 
+    Connection dbConn = null;
     // Set up the JDBC connection pool
     try {
       setupJdbcConnectionPool(conf);
+      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      determineDatabaseProduct(dbConn);
     } catch (SQLException e) {
       String msg = "Unable to instantiate JDBC connection pooling, " + 
e.getMessage();
       LOG.error(msg);
       throw new RuntimeException(e);
     }
+    finally {
+      closeDbConn(dbConn);
+    }
 
     timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 
TimeUnit.MILLISECONDS);
     buildJumpTable();
@@ -276,29 +305,6 @@ abstract class TxnHandler implements TxnStore {
       return getOpenTxns();
     }
   }
-
-  /**
-   * Transform a {@link 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
-   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that 
the caller intends to
-   * read the files, and thus treats both open and aborted transactions as 
invalid.
-   * @param txns txn list from the metastore
-   * @param currentTxn Current transaction that the user has open.  If this is 
greater than 0 it
-   *                   will be removed from the exceptions list so that the 
user sees his own
-   *                   transaction as valid.
-   * @return a valid txn list.
-   */
-  public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, 
long currentTxn) {
-    long highWater = txns.getTxn_high_water_mark();
-    Set<Long> open = txns.getOpen_txns();
-    long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
-    int i = 0;
-    for(long txn: open) {
-      if (currentTxn > 0 && currentTxn == txn) continue;
-      exceptions[i++] = txn;
-    }
-    return new ValidReadTxnList(exceptions, highWater);
-  }
-
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
     int numTxns = rqst.getNum_txns();
     try {
@@ -306,11 +312,12 @@ abstract class TxnHandler implements TxnStore {
       Statement stmt = null;
       ResultSet rs = null;
       try {
+        lockInternal();
         /**
          * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work 
correctly, this operation must ensure
          * that advancing the counter in NEXT_TXN_ID and adding appropriate 
entries to TXNS is atomic.
-         * Also, advancing the counter must work when multiple metastores are 
running, thus either
-         * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation.  The 
former is preferred since it prevents
+         * Also, advancing the counter must work when multiple metastores are 
running.
+         * SELECT ... FOR UPDATE is used to prevent
          * concurrent DB transactions being rolled back due to Write-Write 
conflict on NEXT_TXN_ID.
          *
          * In the current design, there can be several metastore instances 
running in a given Warehouse.
@@ -323,14 +330,14 @@ abstract class TxnHandler implements TxnStore {
          * Longer term we can consider running Active-Passive MS (at least wrt 
to ACID operations).  This
          * set could support a write-through cache for added performance.
          */
-        dbConn = getDbConn(getRequiredIsolationLevel());
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         // Make sure the user has not requested an insane amount of txns.
         int maxTxns = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
         if (numTxns > maxTxns) numTxns = maxTxns;
 
         stmt = dbConn.createStatement();
-        String s = addForUpdateClause(dbConn, "select ntxn_next from 
NEXT_TXN_ID");
+        String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -366,6 +373,7 @@ abstract class TxnHandler implements TxnStore {
           + StringUtils.stringifyException(e));
       } finally {
         close(rs, stmt, dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       return openTxns(rqst);
@@ -377,7 +385,8 @@ abstract class TxnHandler implements TxnStore {
     try {
       Connection dbConn = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
@@ -394,6 +403,7 @@ abstract class TxnHandler implements TxnStore {
           + StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       abortTxn(rqst);
@@ -406,33 +416,24 @@ abstract class TxnHandler implements TxnStore {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      ResultSet lockHandle = null;
       try {
+        lockInternal();
         /**
-         * This has to run at SERIALIZABLE to make no concurrent attempt to 
acquire locks (insert into HIVE_LOCKS)
-         * can happen.  Otherwise we may end up with orphaned locks.  While 
lock() and commitTxn() should not
-         * normally run concurrently (for same txn) but could due to bugs in 
the client which could then
-         * (w/o SERIALIZABLE) corrupt internal transaction manager state.  
Also competes with abortTxn()
-         *
-         * Sketch of an improvement:
-         * Introduce a new transaction state in TXNS, state 'c'.  This is a 
transient Committed state.
-         * commitTxn() would mark the txn 'c' in TXNS in an independent txn.  
Other operation like
-         * lock(), heartbeat(), etc would raise errors for txn in 'c' state 
and getOpenTxns(), etc would
-         * treat 'c' txn as 'open'.  Then this method could run in READ 
COMMITTED since the
-         * entry for this txn in TXNS in 'c' acts like a monitor.
-         * Since the move to 'c' state is in one txn (to make it visible) and 
the rest of the
-         * operations in another (could even be made separate txns), there is 
a possibility of failure
-         * between the 2.  Thus the AcidHouseKeeper logic to timeout txns 
should apply 'c' state txns.
-         *
-         * Or perhaps Select * TXNS where txn_id = " + txnid; for update
+         * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U 
ensures that no other
+         * operation can change this txn (such acquiring locks). While lock() 
and commitTxn()
+         * should not normally run concurrently (for same txn) but could due 
to bugs in the client
+         * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
          */
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        // Before we do the commit heartbeat the txn.  This is slightly odd in 
that we're going to
-        // commit it, but it does two things.  One, it makes sure the 
transaction is still valid.
-        // Two, it avoids the race condition where we time out between now and 
when we actually
-        // commit the transaction below.  And it does this all in a dead-lock 
safe way by
-        // committing the heartbeat back to the database.
-        heartbeatTxn(dbConn, txnid);
+
+        lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+        if(lockHandle == null) {
+          //this also ensures that txn is still there and in expected state 
(hasn't been timed out)
+          ensureValidTxn(dbConn, txnid, stmt);
+          shouldNeverHappen(txnid);
+        }
 
         // Move the record from txn_components into completed_txn_components 
so that the compactor
         // knows where to look to compact.
@@ -465,35 +466,213 @@ abstract class TxnHandler implements TxnStore {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(lockHandle, stmt, dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       commitTxn(rqst);
     }
   }
 
-  public LockResponse lock(LockRequest rqst)
-    throws NoSuchTxnException, TxnAbortedException, MetaException {
+  /**
+   * As much as possible (i.e. in absence of retries) we want both operations 
to be done on the same
+   * connection (but separate transactions).  This avoid some flakiness in 
BONECP where if you
+   * perform an operation on 1 connection and immediately get another fron the 
pool, the 2nd one
+   * doesn't see results of the first.
+   */
+  public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, 
TxnAbortedException, MetaException {
+    ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
+    try {
+      return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, 
rqst.getTxnid());
+    }
+    catch(NoSuchLockException e) {
+      // This should never happen, as we just added the lock id
+      throw new MetaException("Couldn't find a lock we just created! " + 
e.getMessage());
+    }
+  }
+  private static final class ConnectionLockIdPair {
+    private final Connection dbConn;
+    private final long extLockId;
+    private ConnectionLockIdPair(Connection dbConn, long extLockId) {
+      this.dbConn = dbConn;
+      this.extLockId = extLockId;
+    }
+  }
+
+  /**
+   * Note that by definition select for update is divorced from update, i.e. 
you executeQuery() to read
+   * and then executeUpdate().  One other alternative would be to actually 
update the row in TXNX but
+   * to the same value as before thus forcing db to acquire write lock for 
duration of the transaction.
+   *
+   * There is no real reason to return the ResultSet here other than to make 
sure the reference to it
+   * is retained for duration of intended lock scope and is not GC'd thus 
(unlikely) causing lock
+   * to be released.
+   * @param txnState the state this txn is expected to be in.  may be null
+   * @return null if no row was found
+   * @throws SQLException
+   * @throws MetaException
+   */
+  private ResultSet lockTransactionRecord(Statement stmt, long txnId, 
Character txnState) throws SQLException, MetaException {
+    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + 
(txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
+    ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
+    if(rs.next()) {
+      return rs;
+    }
+    close(rs);
+    return null;
+  }
+
+  /**
+   * This enters locks into the queue in {@link #LOCK_WAITING} mode.
+   *
+   * Isolation Level Notes:
+   * 1. We use S4U (withe read_committed) to generate the next (ext) lock id.  
This serializes
+   * any 2 {@code enqueueLockWithRetry()} calls.
+   * 2. We use S4U on the relevant TXNS row to block any concurrent 
abort/commit/etc operations
+   * @see #checkLockWithRetry(Connection, long, long)
+   */
+  private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws 
NoSuchTxnException, TxnAbortedException, MetaException {
+    boolean success = false;
+    Connection dbConn = null;
+    try {
+      Statement stmt = null;
+      ResultSet rs = null;
+      ResultSet lockHandle = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        long txnid = rqst.getTxnid();
+        stmt = dbConn.createStatement();
+        if (isValidTxn(txnid)) {
+          //this also ensures that txn is still there in expected state
+          lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+          if(lockHandle == null) {
+            ensureValidTxn(dbConn, txnid, stmt);
+            shouldNeverHappen(txnid);
+          }
+        }
+        /** Get the next lock id.
+         * This has to be atomic with adding entries to HIVE_LOCK entries (1st 
add in W state) to prevent a race.
+         * Suppose ID gen is a separate txn and 2 concurrent lock() methods 
are running.  1st one generates nl_next=7,
+         * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and 
aquires the locks.  Then 7 unblocks,
+         * and add it's W locks but it won't see locks from 8 since to be 
'fair' {@link #checkLock(java.sql.Connection, long)}
+         * doesn't block on locks acquired later than one it's checking*/
+        String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID");
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+          throw new MetaException("Transaction tables not properly " +
+            "initialized, no record found in next_lock_id");
+        }
+        long extLockId = rs.getLong(1);
+        s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+        LOG.debug("Going to execute update <" + s + ">");
+        stmt.executeUpdate(s);
+
+        if (txnid > 0) {
+          // For each component in this lock request,
+          // add an entry to the txn_components table
+          // This must be done before HIVE_LOCKS is accessed
+          for (LockComponent lc : rqst.getComponent()) {
+            String dbName = lc.getDbname();
+            String tblName = lc.getTablename();
+            String partName = lc.getPartitionname();
+            s = "insert into TXN_COMPONENTS " +
+              "(tc_txnid, tc_database, tc_table, tc_partition) " +
+              "values (" + txnid + ", '" + dbName + "', " +
+              (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+              (partName == null ? "null" : "'" + partName + "'") + ")";
+            LOG.debug("Going to execute update <" + s + ">");
+            stmt.executeUpdate(s);
+          }
+        }
+
+        long intLockId = 0;
+        for (LockComponent lc : rqst.getComponent()) {
+          intLockId++;
+          String dbName = lc.getDbname();
+          String tblName = lc.getTablename();
+          String partName = lc.getPartitionname();
+          LockType lockType = lc.getType();
+          char lockChar = 'z';
+          switch (lockType) {
+            case EXCLUSIVE:
+              lockChar = LOCK_EXCLUSIVE;
+              break;
+            case SHARED_READ:
+              lockChar = LOCK_SHARED;
+              break;
+            case SHARED_WRITE:
+              lockChar = LOCK_SEMI_SHARED;
+              break;
+          }
+          long now = getDbTime(dbConn);
+          s = "insert into HIVE_LOCKS " +
+            " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
+            "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, 
hl_user, hl_host)" +
+            " values (" + extLockId + ", " +
+            +intLockId + "," + txnid + ", '" +
+            dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'")
+            + ", " + (partName == null ? "null" : "'" + partName + "'") +
+            ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " +
+            //for locks associated with a txn, we always heartbeat txn and 
timeout based on that
+            (isValidTxn(txnid) ? 0 : now) + ", '" +
+            rqst.getUser() + "', '" + rqst.getHostname() + "')";
+          LOG.debug("Going to execute update <" + s + ">");
+          stmt.executeUpdate(s);
+        }
+        dbConn.commit();
+        success = true;
+        return new ConnectionLockIdPair(dbConn, extLockId);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")");
+        throw new MetaException("Unable to update transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(lockHandle);
+        close(rs, stmt, null);
+        if (!success) {
+          /* This needs to return a "live" connection to be used by operation 
that follows it.
+          Thus it only closes Connection on failure/retry. */
+          closeDbConn(dbConn);
+        }
+        unlockInternal();
+      }
+    }
+    catch(RetryException e) {
+      return enqueueLockWithRetry(rqst);
+    }
+  }
+  private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, 
long txnId)
+    throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, 
MetaException {
     try {
-      Connection dbConn = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        return lock(dbConn, rqst);
+        lockInternal();
+        if(dbConn.isClosed()) {
+          //should only get here if retrying this op
+          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        }
+        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+        return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "lock(" + rqst + ")");
+        checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + 
txnId + ")");
         throw new MetaException("Unable to update transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        unlockInternal();
         closeDbConn(dbConn);
       }
-    } catch (RetryException e) {
-      return lock(rqst);
+    }
+    catch(RetryException e) {
+      return checkLockWithRetry(dbConn, extLockId, txnId);
     }
   }
-
   /**
    * Why doesn't this get a txnid as parameter?  The caller should either know 
the txnid or know there isn't one.
    * Either way getTxnIdFromLockId() will not be needed.  This would be a 
Thrift change.
@@ -515,6 +694,7 @@ abstract class TxnHandler implements TxnStore {
       Connection dbConn = null;
       long extLockId = rqst.getLockid();
       try {
+        lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         // Heartbeat on the lockid first, to assure that our lock is still 
valid.
         // Then look up the lock info (hopefully in the cache).  If these locks
@@ -529,6 +709,9 @@ abstract class TxnHandler implements TxnStore {
         else {
           heartbeatLock(dbConn, extLockId);
         }
+        //todo: strictly speaking there is a bug here.  heartbeat*() commits 
but both heartbeat and
+        //checkLock() are in the same retry block, so if checkLock() throws, 
heartbeat is also retired
+        //extra heartbeat is logically harmless, but ...
         dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
@@ -539,6 +722,7 @@ abstract class TxnHandler implements TxnStore {
           JavaUtils.lockIdToString(extLockId) + " " + 
StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       return checkLock(rqst);
@@ -549,6 +733,9 @@ abstract class TxnHandler implements TxnStore {
   /**
    * This would have been made simpler if all locks were associated with a 
txn.  Then only txn needs to
    * be heartbeated, committed, etc.  no need for client to track individual 
locks.
+   * When removing locks not associated with txn this potentially conflicts 
with
+   * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will 
be locked as needed by db.
+   * since this only removes from HIVE_LOCKS at worst some lock acquire is 
delayed
    */
   public void unlock(UnlockRequest rqst)
     throws NoSuchLockException, TxnOpenException, MetaException {
@@ -573,6 +760,8 @@ abstract class TxnHandler implements TxnStore {
         //hl_txnid <> 0 means it's associated with a transaction
         String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + 
extLockId + " AND (hl_txnid = 0 OR" +
           " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))";
+        //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for 
multi-statement txns where
+        //some query attempted to lock (thus LOCK_WAITING state) but is giving 
up due to timeout for example
         LOG.debug("Going to execute update <" + s + ">");
         int rc = stmt.executeUpdate(s);
         if (rc < 1) {
@@ -735,8 +924,9 @@ abstract class TxnHandler implements TxnStore {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
           try {
-            //todo: this is expensive call: at least 2 update queries per txn
-            //is this really worth it?
+            //todo: do all updates in 1 SQL statement and check update count
+            //if update count is less than was requested, go into more 
expensive checks
+            //for each txn
             heartbeatTxn(dbConn, txn);
           } catch (NoSuchTxnException e) {
             nosuch.add(txn);
@@ -765,11 +955,12 @@ abstract class TxnHandler implements TxnStore {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(getRequiredIsolationLevel());
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
 
         // Get the id for the next entry in the queue
-        String s = addForUpdateClause(dbConn, "select ncq_next from 
NEXT_COMPACTION_QUEUE_ID");
+        String s = addForUpdateClause("select ncq_next from 
NEXT_COMPACTION_QUEUE_ID");
         LOG.debug("going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -836,6 +1027,7 @@ abstract class TxnHandler implements TxnStore {
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       return compact(rqst);
@@ -909,16 +1101,25 @@ abstract class TxnHandler implements TxnStore {
     }
   }
 
+  private static void shouldNeverHappen(long txnid) {
+    throw new RuntimeException("This should never happen: " + 
JavaUtils.txnIdToString(txnid));
+  }
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
+    ResultSet lockHandle = null;
     try {
       try {
+        lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        // Heartbeat this first to make sure the transaction is still valid.
-        heartbeatTxn(dbConn, rqst.getTxnid());
+        lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
+        if(lockHandle == null) {
+          //ensures txn is still there and in expected state
+          ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
+          shouldNeverHappen(rqst.getTxnid());
+        }
         for (String partName : rqst.getPartitionnames()) {
           StringBuilder buff = new StringBuilder();
           buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, 
tc_table, tc_partition) values (");
@@ -943,8 +1144,8 @@ abstract class TxnHandler implements TxnStore {
         throw new MetaException("Unable to insert into from transaction 
database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(lockHandle, stmt, dbConn);
+        unlockInternal();
       }
     } catch (RetryException e) {
       addDynamicPartitions(rqst);
@@ -990,13 +1191,15 @@ abstract class TxnHandler implements TxnStore {
 
   protected Connection getDbConn(int isolationLevel) throws SQLException {
     int rc = doRetryOnConnPool ? 10 : 1;
+    Connection dbConn = null;
     while (true) {
       try {
-        Connection dbConn = connPool.getConnection();
+        dbConn = connPool.getConnection();
         dbConn.setAutoCommit(false);
         dbConn.setTransactionIsolation(isolationLevel);
         return dbConn;
       } catch (SQLException e){
+        closeDbConn(dbConn);
         if ((--rc) <= 0) throw e;
         LOG.error("There is a problem with a connection from the pool, 
retrying(rc=" + rc + "): " +
           getMessage(e), e);
@@ -1013,7 +1216,9 @@ abstract class TxnHandler implements TxnStore {
   }
   protected void closeDbConn(Connection dbConn) {
     try {
-      if (dbConn != null && !dbConn.isClosed()) dbConn.close();
+      if (dbConn != null && !dbConn.isClosed()) {
+        dbConn.close();
+      }
     } catch (SQLException e) {
       LOG.warn("Failed to close db connection " + getMessage(e));
     }
@@ -1077,8 +1282,8 @@ abstract class TxnHandler implements TxnStore {
     // Derby and newer MySQL driver use the new SQLTransactionRollbackException
     boolean sendRetrySignal = false;
     try {
-      if (dbProduct == null && conn != null) {
-        determineDatabaseProduct(conn);
+      if(dbProduct == null) {
+        throw new IllegalStateException("DB Type not determined yet.");
       }
       if (e instanceof SQLTransactionRollbackException ||
         ((dbProduct == DatabaseProduct.MYSQL || dbProduct == 
DatabaseProduct.POSTGRES ||
@@ -1144,8 +1349,7 @@ abstract class TxnHandler implements TxnStore {
     try {
       stmt = conn.createStatement();
       String s;
-      DatabaseProduct prod = determineDatabaseProduct(conn);
-      switch (prod) {
+      switch (dbProduct) {
         case DERBY:
           s = "values current_timestamp";
           break;
@@ -1161,7 +1365,7 @@ abstract class TxnHandler implements TxnStore {
           break;
 
         default:
-          String msg = "Unknown database product: " + prod.toString();
+          String msg = "Unknown database product: " + dbProduct.toString();
           LOG.error(msg);
           throw new MetaException(msg);
       }
@@ -1197,16 +1401,15 @@ abstract class TxnHandler implements TxnStore {
    * Determine the database product type
    * @param conn database connection
    * @return database product type
-   * @throws MetaException if the type cannot be determined or is unknown
    */
-  protected DatabaseProduct determineDatabaseProduct(Connection conn) throws 
MetaException {
+  private DatabaseProduct determineDatabaseProduct(Connection conn) {
     if (dbProduct == null) {
-      try {//todo: make this work when conn == null
+      try {
         String s = conn.getMetaData().getDatabaseProductName();
         if (s == null) {
           String msg = "getDatabaseProductName returns null, can't determine 
database product";
           LOG.error(msg);
-          throw new MetaException(msg);
+          throw new IllegalStateException(msg);
         } else if (s.equals("Apache Derby")) {
           dbProduct = DatabaseProduct.DERBY;
         } else if (s.equals("Microsoft SQL Server")) {
@@ -1220,13 +1423,13 @@ abstract class TxnHandler implements TxnStore {
         } else {
           String msg = "Unrecognized database product name <" + s + ">";
           LOG.error(msg);
-          throw new MetaException(msg);
+          throw new IllegalStateException(msg);
         }
 
       } catch (SQLException e) {
         String msg = "Unable to get database product name: " + e.getMessage();
         LOG.error(msg);
-        throw new MetaException(msg);
+        throw new IllegalStateException(msg);
       }
     }
     return dbProduct;
@@ -1398,7 +1601,7 @@ abstract class TxnHandler implements TxnStore {
         // We may have already created the tables and thus don't need to redo 
it.
         if (!e.getMessage().contains("already exists")) {
           throw new RuntimeException("Unable to set up transaction database 
for" +
-            " testing: " + e.getMessage());
+            " testing: " + e.getMessage(), e);
         }
       }
     }
@@ -1410,6 +1613,9 @@ abstract class TxnHandler implements TxnStore {
   /**
    * TODO: expose this as an operation to client.  Useful for streaming API to 
abort all remaining
    * trasnactions in a batch on IOExceptions.
+   * Caller must rollback the transaction if not all transactions were aborted 
since this will not
+   * attempt to delete associated locks in this case.
+   *
    * @param dbConn An active connection
    * @param txnids list of transactions to abort
    * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure 
this doesn't Abort txn which were
@@ -1423,17 +1629,12 @@ abstract class TxnHandler implements TxnStore {
     if (txnids.isEmpty()) {
       return 0;
     }
-    if(Connection.TRANSACTION_SERIALIZABLE != 
dbConn.getTransactionIsolation()) {
-      /** Running this at SERIALIZABLE prevents new locks being added for this 
txnid(s) concurrently
-        * which would cause them to become orphaned.
-        */
-      throw new IllegalStateException("Expected SERIALIZABLE isolation. Found 
" + dbConn.getTransactionIsolation());
-    }
     try {
       stmt = dbConn.createStatement();
-
-      // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
-      StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where 
hl_txnid in (");
+      //This is an update statement, thus at any Isolation level will take 
Write locks so will block
+      //all other ops using S4U on TXNS row.
+      StringBuilder buf = new StringBuilder("update TXNS set txn_state = '" + 
TXN_ABORTED +
+        "' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
       boolean first = true;
       for (Long id : txnids) {
         if (first) first = false;
@@ -1441,13 +1642,22 @@ abstract class TxnHandler implements TxnStore {
         buf.append(id);
       }
       buf.append(')');
+      if(max_heartbeat > 0) {
+        buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+      }
       LOG.debug("Going to execute update <" + buf.toString() + ">");
-      stmt.executeUpdate(buf.toString());
+      updateCnt = stmt.executeUpdate(buf.toString());
+      if(updateCnt < txnids.size()) {
+        /**
+         * have to bail in this case since we don't know which transactions 
were not Aborted and
+         * thus don't know which locks to delete
+         * This may happen due to a race between {@link 
#heartbeat(HeartbeatRequest)}  operation and
+         * {@link #performTimeOuts()}
+         */
+        return updateCnt;
+      }
 
-      //todo: seems like we should do this first and if it misses, don't 
bother with
-      //delete from HIVE_LOCKS since it will be rolled back
-      buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED +
-        "' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
+      buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
       first = true;
       for (Long id : txnids) {
         if (first) first = false;
@@ -1455,155 +1665,14 @@ abstract class TxnHandler implements TxnStore {
         buf.append(id);
       }
       buf.append(')');
-      if(max_heartbeat > 0) {
-        buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
-      }
       LOG.debug("Going to execute update <" + buf.toString() + ">");
-      updateCnt = stmt.executeUpdate(buf.toString());
-
+      stmt.executeUpdate(buf.toString());
     } finally {
       closeStmt(stmt);
     }
     return updateCnt;
   }
 
-  /**
-   * Isolation Level Notes:
-   * Run at SERIALIZABLE to make sure no one is adding new locks while we are 
checking conflicts here.
-   * 
-   * Ramblings:
-   * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 
1 txn@RC
-   * since this is just in Wait state.
-   * (Then we'd need to ensure that in !wait case we don't rely on rollback 
and again in case of
-   * failure, the W locks will timeout if failure does not propagate to client 
in some way, or it
-   * will and client will Abort).
-   * Actually, whether we can do this depends on what happens when you try to 
get a lock and notice
-   * a conflicting locks in W mode do we wait in this case?  if so it's a 
problem because while you
-   * are checking new locks someone may insert new  W locks that you don't 
see...
-   * On the other hand, this attempts to be 'fair', i.e. process locks in 
order so could we assume
-   * that additional W locks will have higher IDs????
-   *
-   * We can use Select for Update to generate the next LockID.  In fact we can 
easily do this in a separate txn.
-   * This avoids contention on NEXT_LOCK_ID.  The rest of the logic will be 
still need to be done at Serializable, I think,
-   * but it will not be updating the same row from 2 DB.
-   *
-   * Request a lock
-   * @param dbConn database connection
-   * @param rqst lock information
-   * @return information on whether the lock was acquired.
-   * @throws NoSuchTxnException
-   * @throws TxnAbortedException
-   */
-  private LockResponse lock(Connection dbConn, LockRequest rqst)
-    throws NoSuchTxnException,  TxnAbortedException, MetaException, 
SQLException {
-    // We want to minimize the number of concurrent lock requests being 
issued.  If we do not we
-    // get a large number of deadlocks in the database, since this method has 
to both clean
-    // timedout locks and insert new locks.  This synchronization barrier will 
not eliminate all
-    // deadlocks, and the code is still resilient in the face of a database 
deadlock.  But it
-    // will reduce the number.  This could have been done via a lock table 
command in the
-    // underlying database, but was not for two reasons.  One, different 
databases have different
-    // syntax for lock table, making it harder to use.  Two, that would lock 
the HIVE_LOCKS table
-    // and prevent other operations (such as committing transactions, showing 
locks,
-    // etc.) that should not interfere with this one.
-    synchronized (lockLock) {
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        long txnid = rqst.getTxnid();
-        if (txnid > 0) {
-          // Heartbeat the transaction so we know it is valid and we avoid it 
timing out while we
-          // are locking.
-          heartbeatTxn(dbConn, txnid);
-        }
-        stmt = dbConn.createStatement();
-
-       /** Get the next lock id.
-        * This has to be atomic with adding entries to HIVE_LOCK entries (1st 
add in W state) to prevent a race.
-        * Suppose ID gen is a separate txn and 2 concurrent lock() methods are 
running.  1st one generates nl_next=7,
-        * 2nd nl_next=8.  Then 8 goes first to insert into HIVE_LOCKS and 
aquires the locks.  Then 7 unblocks,
-        * and add it's W locks but it won't see locks from 8 since to be 
'fair' {@link #checkLock(java.sql.Connection, long)}
-        * doesn't block on locks acquired later than one it's checking*/
-        String s = addForUpdateClause(dbConn, "select nl_next from 
NEXT_LOCK_ID");
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_lock_id");
-        }
-        long extLockId = rs.getLong(1);
-        s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        if (txnid > 0) {
-          // For each component in this lock request,
-          // add an entry to the txn_components table
-          // This must be done before HIVE_LOCKS is accessed
-          
-          //Isolation note:
-          //the !wait option is not actually used anywhere.  W/o that,
-          // if we make CompactionTxnHandler.markCleaned() not delete anything 
above certain txn_id
-          //then there is not reason why this insert into TXN_COMPONENTS needs 
to run at Serializable.
-          //
-          // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK 
at READ_COMMITTED as long
-          //as check lock is at serializable (or any other way to make sure 
it's exclusive)
-          for (LockComponent lc : rqst.getComponent()) {
-            String dbName = lc.getDbname();
-            String tblName = lc.getTablename();
-            String partName = lc.getPartitionname();
-            s = "insert into TXN_COMPONENTS " +
-              "(tc_txnid, tc_database, tc_table, tc_partition) " +
-              "values (" + txnid + ", '" + dbName + "', " +
-              (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-              (partName == null ? "null" : "'" +  partName + "'") + ")";
-            LOG.debug("Going to execute update <" + s + ">");
-            stmt.executeUpdate(s);
-          }
-        }
-
-        long intLockId = 0;
-        for (LockComponent lc : rqst.getComponent()) {
-          intLockId++;
-          String dbName = lc.getDbname();
-          String tblName = lc.getTablename();
-          String partName = lc.getPartitionname();
-          LockType lockType = lc.getType();
-          char lockChar = 'z';
-          switch (lockType) {
-            case EXCLUSIVE: lockChar = LOCK_EXCLUSIVE; break;
-            case SHARED_READ: lockChar = LOCK_SHARED; break;
-            case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break;
-          }
-          long now = getDbTime(dbConn);
-          s = "insert into HIVE_LOCKS " +
-            " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
-            "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, 
hl_user, hl_host)" +
-            " values (" + extLockId + ", " +
-            + intLockId + "," + txnid + ", '" +
-            dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
-            + ", " + (partName == null ? "null" : "'" + partName + "'") +
-            ", '" + LOCK_WAITING + "', " +  "'" + lockChar + "', " +
-            //for locks associated with a txn, we always heartbeat txn and 
timeout based on that
-            (isValidTxn(txnid) ? 0 : now) + ", '" +
-            rqst.getUser() + "', '" + rqst.getHostname() + "')";
-          LOG.debug("Going to execute update <" + s + ">");
-          stmt.executeUpdate(s);
-        }
-        /**to make txns shorter we could commit here and start a new txn for 
checkLock.  This would
-         * require moving checkRetryable() down into here.  Could we then run 
the part before this
-         * commit are READ_COMMITTED?*/
-        return checkLock(dbConn, extLockId);
-      } catch (NoSuchLockException e) {
-        // This should never happen, as we just added the lock id
-        throw new MetaException("Couldn't find a lock we just created! " + 
e.getMessage());
-      } finally {
-        close(rs);
-        closeStmt(stmt);
-      }
-    }
-  }
   private static boolean isValidTxn(long txnId) {
     return txnId != 0;
   }
@@ -1618,12 +1687,17 @@ abstract class TxnHandler implements TxnStore {
   private LockResponse checkLock(Connection dbConn,
                                  long extLockId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, 
MetaException, SQLException {
+    if(dbConn.getTransactionIsolation() != 
Connection.TRANSACTION_SERIALIZABLE) {
+      //longer term we should instead use AUX_TABLE/S4U to serialize all 
checkLock() operations
+      //that would be less prone to deadlocks
+      throw new IllegalStateException("Unexpected Isolation Level: " + 
dbConn.getTransactionIsolation());
+    }
     List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, 
extLockId);//being acquired now
     LockResponse response = new LockResponse();
     response.setLockid(extLockId);
 
     LOG.debug("checkLock(): Setting savepoint. extLockId=" + 
JavaUtils.lockIdToString(extLockId));
-    Savepoint save = dbConn.setSavepoint();
+    Savepoint save = dbConn.setSavepoint();//todo: get rid of this
     StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
       "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
       "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
@@ -2057,9 +2131,8 @@ abstract class TxnHandler implements TxnStore {
    * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will 
return the
    * appropriately modified row limiting query.
    */
-  private String addLimitClause(Connection dbConn, int numRows, String 
noSelectsqlQuery) throws MetaException {
-    DatabaseProduct prod = determineDatabaseProduct(dbConn);
-    switch (prod) {
+  private String addLimitClause(int numRows, String noSelectsqlQuery) throws 
MetaException {
+    switch (dbProduct) {
       case DERBY:
         //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html
         return "select " + noSelectsqlQuery + " fetch first " + numRows + " 
rows only";
@@ -2076,7 +2149,7 @@ abstract class TxnHandler implements TxnStore {
         //https://msdn.microsoft.com/en-us/library/ms189463.aspx
         return "select TOP(" + numRows + ") " + noSelectsqlQuery;
       default:
-        String msg = "Unrecognized database product name <" + prod + ">";
+        String msg = "Unrecognized database product name <" + dbProduct + ">";
         LOG.error(msg);
         throw new MetaException(msg);
     }
@@ -2110,7 +2183,7 @@ abstract class TxnHandler implements TxnStore {
         stmt = dbConn.createStatement();
         String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
           "' and txn_last_heartbeat <  " + (now - timeout);
-        s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
+        s = addLimitClause(250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if(!rs.next()) {
@@ -2127,8 +2200,7 @@ abstract class TxnHandler implements TxnStore {
           }
         } while(rs.next());
         dbConn.commit();
-        close(rs, stmt, dbConn);
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        close(rs, stmt, null);
         int numTxnsAborted = 0;
         for(List<Long> batchToAbort : timedOutTxns) {
           if(abortTxns(dbConn, batchToAbort, now - timeout) == 
batchToAbort.size()) {
@@ -2331,45 +2403,11 @@ abstract class TxnHandler implements TxnStore {
     return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + 
ex.getErrorCode() + ")";
   }
   /**
-   * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} 
TRANSACTION_READ_COMMITTED, etc.
-   * Different DBs support different concurrency management options.  This 
class relies on SELECT ... FOR UPDATE
-   * functionality.  Where that is not available, SERIALIZABLE isolation is 
used.
-   * This method must always agree with {@link 
#addForUpdateClause(java.sql.Connection, String)}, in that
-   * if FOR UPDATE is not available, must run operation at SERIALIZABLE.
-   */
-  private int getRequiredIsolationLevel() throws MetaException, SQLException {
-    if(dbProduct == null) {
-      Connection tmp = null;
-      try {
-        tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        determineDatabaseProduct(tmp);
-      }
-      finally {
-        closeDbConn(tmp);
-      }
-    }
-    switch (dbProduct) {
-      case DERBY:
-        return Connection.TRANSACTION_SERIALIZABLE;
-      case MYSQL:
-      case ORACLE:
-      case POSTGRES:
-      case SQLSERVER:
-        return Connection.TRANSACTION_READ_COMMITTED;
-      default:
-        String msg = "Unrecognized database product name <" + dbProduct + ">";
-        LOG.error(msg);
-        throw new MetaException(msg);
-    }
-  }
-  /**
    * Given a {@code selectStatement}, decorated it with FOR UPDATE or 
semantically equivalent
-   * construct.  If the DB doesn't support, return original select.  This 
method must always
-   * agree with {@link #getRequiredIsolationLevel()}
+   * construct.  If the DB doesn't support, return original select.
    */
-  private String addForUpdateClause(Connection dbConn, String selectStatement) 
throws MetaException {
-    DatabaseProduct prod = determineDatabaseProduct(dbConn);
-    switch (prod) {
+  private String addForUpdateClause(String selectStatement) throws 
MetaException {
+    switch (dbProduct) {
       case DERBY:
         //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
         //sadly in Derby, FOR UPDATE doesn't meant what it should
@@ -2386,7 +2424,7 @@ abstract class TxnHandler implements TxnStore {
         //https://msdn.microsoft.com/en-us/library/ms187373.aspx
         return selectStatement + " with(updlock)";
       default:
-        String msg = "Unrecognized database product name <" + prod + ">";
+        String msg = "Unrecognized database product name <" + dbProduct + ">";
         LOG.error(msg);
         throw new MetaException(msg);
     }
@@ -2394,6 +2432,9 @@ abstract class TxnHandler implements TxnStore {
   static String quoteString(String input) {
     return "'" + input + "'";
   }
+  static String quoteChar(char c) {
+    return "'" + c + "'";
+  }
   static CompactionType dbCompactionType2ThriftType(char dbValue) {
     switch (dbValue) {
       case MAJOR_TYPE:
@@ -2416,4 +2457,20 @@ abstract class TxnHandler implements TxnStore {
         return null;
     }
   }
+
+  /**
+   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
+   * Select ... For Update to sequence operations properly.  In practice that 
means when running
+   * with Derby database.  See more notes at class level.
+   */
+  private void lockInternal() {
+    if(dbProduct == DatabaseProduct.DERBY) {
+      derbyLock.lock();
+    }
+  }
+  private void unlockInternal() {
+    if(dbProduct == DatabaseProduct.DERBY) {
+      derbyLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aaa35693/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
index 17bd01d..ddee0fb 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.tools.ant.RuntimeConfigurable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,15 +36,14 @@ public class TestTxnHandlerNegative {
   public void testBadConnection() throws Exception {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
-    TxnStore txnHandler1 = TxnUtils.getTxnStore(conf);
-    MetaException e = null;
+    RuntimeException e = null;
     try {
-      txnHandler1.getOpenTxns();
+      TxnUtils.getTxnStore(conf);
     }
-    catch(MetaException ex) {
+    catch(RuntimeException ex) {
       LOG.info("Expected error: " + ex.getMessage(), ex);
       e = ex;
     }
-    assert e != null : "did not get exception";
+    assert e != null && e.getMessage().contains("No suitable driver found for 
blah") : "did not get exception";
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aaa35693/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 47dbbb3..e8ebe55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -407,7 +407,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   private void stopHeartbeat() {
     if (heartbeatTask != null && !heartbeatTask.isCancelled() && 
!heartbeatTask.isDone()) {
-      heartbeatTask.cancel(true);
+      heartbeatTask.cancel(false);
       heartbeatTask = null;
     }
   }

Reply via email to