HIVE-11833 : TxnHandler heartbeat txn doesn't need serializable DB txn level (Sergey Shelukhin, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c0fb13b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c0fb13b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c0fb13b Branch: refs/heads/beeline-cli Commit: 4c0fb13b1313f8cafe866105515978524a032c76 Parents: e9c8d7c Author: Sergey Shelukhin <[email protected]> Authored: Fri Sep 18 13:42:18 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Sep 18 13:42:18 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/txn/TxnHandler.java | 61 ++++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4c0fb13b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 795b2d9..9ecb82a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -621,7 +621,7 @@ public class TxnHandler { try { Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); heartbeatLock(dbConn, ids.getLockid()); heartbeatTxn(dbConn, ids.getTxnid()); } catch (SQLException e) { @@ -1727,32 +1727,17 @@ public class TxnHandler { try { stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; - ResultSet rs2 = stmt.executeQuery(s); - boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; - LOG.debug("Going to rollback"); - dbConn.rollback(); - if(alreadyCommitted) { - //makes the message more informative - helps to find bugs in client code - throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); - } - throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); - } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); + ensureValidTxn(dbConn, txnid, stmt); + String s = "update TXNS set txn_last_heartbeat = " + now + + " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + ensureValidTxn(dbConn, txnid, stmt); // This should now throw some useful exception. + LOG.warn("Can neither heartbeat txn nor confirm it as invalid."); dbConn.rollback(); - throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + - " already aborted");//todo: add time of abort, which is not currently tracked + throw new NoSuchTxnException("No such txn: " + txnid); } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); } finally { @@ -1760,6 +1745,32 @@ public class TxnHandler { } } + private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt) + throws SQLException, NoSuchTxnException, TxnAbortedException { + // We need to check whether this transaction is valid and open + String s = "select txn_state from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; + ResultSet rs2 = stmt.executeQuery(s); + boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; + LOG.debug("Going to rollback"); + dbConn.rollback(); + if(alreadyCommitted) { + //makes the message more informative - helps to find bugs in client code + throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); + } + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); + } + if (rs.getString(1).charAt(0) == TXN_ABORTED) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + + " already aborted");//todo: add time of abort, which is not currently tracked + } + } + // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException {
