Repository: hive Updated Branches: refs/heads/branch-1 4a786ca41 -> 236cc2ac9
backport HIVE-11833 from 2.0 Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1c1bf2a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1c1bf2a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1c1bf2a Branch: refs/heads/branch-1 Commit: b1c1bf2afa24b1cd10e74f2d3ea3a6377f214c26 Parents: 4a786ca Author: Eugene Koifman <[email protected]> Authored: Tue Nov 17 15:04:48 2015 -0800 Committer: Eugene Koifman <[email protected]> Committed: Tue Nov 17 15:04:48 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hive/metastore/txn/TxnHandler.java | 59 ++++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b1c1bf2a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b685f3d..0b3d565 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -620,7 +620,7 @@ public class TxnHandler { try { Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); heartbeatLock(dbConn, ids.getLockid()); heartbeatTxn(dbConn, ids.getTxnid()); } catch (SQLException e) { @@ -1731,32 +1731,17 @@ public class TxnHandler { try { stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; - ResultSet rs2 = stmt.executeQuery(s); - boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; - LOG.debug("Going to rollback"); + ensureValidTxn(dbConn, txnid, stmt); + String s = "update TXNS set txn_last_heartbeat = " + now + + " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + ensureValidTxn(dbConn, txnid, stmt); // This should now throw some useful exception. + LOG.warn("Can neither heartbeat txn nor confirm it as invalid: " + JavaUtils.txnIdToString(txnid)); dbConn.rollback(); - if(alreadyCommitted) { - //makes the message more informative - helps to find bugs in client code - throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); - } throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + - " already aborted");//todo: add time of abort, which is not currently tracked - } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); //update locks for this txn to the same heartbeat s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); @@ -1768,6 +1753,32 @@ public class TxnHandler { } } + private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt) + throws SQLException, NoSuchTxnException, TxnAbortedException { + // We need to check whether this transaction is valid and open + String s = "select txn_state from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; + ResultSet rs2 = stmt.executeQuery(s); + boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; + LOG.debug("Going to rollback"); + dbConn.rollback(); + if(alreadyCommitted) { + //makes the message more informative - helps to find bugs in client code + throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); + } + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); + } + if (rs.getString(1).charAt(0) == TXN_ABORTED) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + + " already aborted");//todo: add time of abort, which is not currently tracked + } + } + // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException {
