Author: ekoifman
Date: Sat Jan 24 01:05:12 2015
New Revision: 1654442

URL: http://svn.apache.org/r1654442
Log:
HIVE-9390 Enhance retry logic wrt DB access in TxnHandler

Modified:
    
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    
hive/branches/branch-0.14/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java

Modified: 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1654442&r1=1654441&r2=1654442&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 (original)
+++ 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 Sat Jan 24 01:05:12 2015
@@ -5318,126 +5318,74 @@ public class HiveMetaStore extends Thrif
     // Transaction and locking methods
     @Override
     public GetOpenTxnsResponse get_open_txns() throws TException {
-      try {
-        return getTxnHandler().getOpenTxns();
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().getOpenTxns();
     }
 
     // Transaction and locking methods
     @Override
     public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
-      try {
-        return getTxnHandler().getOpenTxnsInfo();
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().getOpenTxnsInfo();
     }
 
     @Override
     public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException {
-      try {
-        return getTxnHandler().openTxns(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().openTxns(rqst);
     }
 
     @Override
     public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException, 
TException {
-      try {
-        getTxnHandler().abortTxn(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      getTxnHandler().abortTxn(rqst);
     }
 
     @Override
     public void commit_txn(CommitTxnRequest rqst)
         throws NoSuchTxnException, TxnAbortedException, TException {
-      try {
-        getTxnHandler().commitTxn(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      getTxnHandler().commitTxn(rqst);
     }
 
     @Override
     public LockResponse lock(LockRequest rqst)
         throws NoSuchTxnException, TxnAbortedException, TException {
-      try {
-        return getTxnHandler().lock(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().lock(rqst);
     }
 
     @Override
     public LockResponse check_lock(CheckLockRequest rqst)
         throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, 
TException {
-      try {
-        return getTxnHandler().checkLock(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().checkLock(rqst);
     }
 
     @Override
     public void unlock(UnlockRequest rqst)
         throws NoSuchLockException, TxnOpenException, TException {
-      try {
-        getTxnHandler().unlock(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      getTxnHandler().unlock(rqst);
     }
 
     @Override
     public ShowLocksResponse show_locks(ShowLocksRequest rqst) throws 
TException {
-      try {
-        return getTxnHandler().showLocks(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().showLocks(rqst);
     }
 
     @Override
     public void heartbeat(HeartbeatRequest ids)
         throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, 
TException {
-      try {
-        getTxnHandler().heartbeat(ids);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      getTxnHandler().heartbeat(ids);
     }
 
     @Override
     public HeartbeatTxnRangeResponse 
heartbeat_txn_range(HeartbeatTxnRangeRequest rqst)
       throws TException {
-      try {
-        return getTxnHandler().heartbeatTxnRange(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().heartbeatTxnRange(rqst);
     }
 
     @Override
     public void compact(CompactionRequest rqst) throws TException {
-      try {
-        getTxnHandler().compact(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      getTxnHandler().compact(rqst);
     }
 
     @Override
     public ShowCompactResponse show_compact(ShowCompactRequest rqst) throws 
TException {
-      try {
-        return getTxnHandler().showCompact(rqst);
-      } catch (MetaException e) {
-        throw new TException(e);
-      }
+      return getTxnHandler().showCompact(rqst);
     }
 
     @Override

Modified: 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654442&r1=1654441&r2=1654442&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 (original)
+++ 
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 Sat Jan 24 01:05:12 2015
@@ -52,51 +52,58 @@ public class CompactionTxnHandler extend
    * or runAs set since these are only potential compactions not actual ones.
    */
   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException {
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Connection dbConn = null;
     Set<CompactionInfo> response = new HashSet<CompactionInfo>();
     Statement stmt = null;
     try {
-      stmt = dbConn.createStatement();
-      // Check for completed transactions
-      String s = "select distinct ctc_database, ctc_table, " +
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        // Check for completed transactions
+        String s = "select distinct ctc_database, ctc_table, " +
           "ctc_partition from COMPLETED_TXN_COMPONENTS";
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        CompactionInfo info = new CompactionInfo();
-        info.dbname = rs.getString(1);
-        info.tableName = rs.getString(2);
-        info.partName = rs.getString(3);
-        response.add(info);
-      }
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.dbname = rs.getString(1);
+          info.tableName = rs.getString(2);
+          info.partName = rs.getString(3);
+          response.add(info);
+        }
 
-      // Check for aborted txns
-      s = "select tc_database, tc_table, tc_partition " +
+        // Check for aborted txns
+        s = "select tc_database, tc_table, tc_partition " +
           "from TXNS, TXN_COMPONENTS " +
           "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
           "group by tc_database, tc_table, tc_partition " +
           "having count(*) > " + maxAborted;
 
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        CompactionInfo info = new CompactionInfo();
-        info.dbname = rs.getString(1);
-        info.tableName = rs.getString(2);
-        info.partName = rs.getString(3);
-        info.tooManyAborts = true;
-        response.add(info);
-      }
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.dbname = rs.getString(1);
+          info.tableName = rs.getString(2);
+          info.partName = rs.getString(3);
+          info.tooManyAborts = true;
+          response.add(info);
+        }
 
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-    } catch (SQLException e) {
-      LOG.error("Unable to connect to transaction database " + e.getMessage());
-    } finally {
-      closeDbConn(dbConn);
-      closeStmt(stmt);
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database " + 
e.getMessage());
+        checkRetryable(dbConn, e, "findPotentialCompactions");
+      } finally {
+        closeDbConn(dbConn);
+        closeStmt(stmt);
+      }
+      return response;
+    }
+    catch (RetryException e) {
+      return findPotentialCompactions(maxAborted);
     }
-    return response;
   }
 
   /**
@@ -107,35 +114,31 @@ public class CompactionTxnHandler extend
    */
   public void setRunAs(long cq_id, String user) throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
-       stmt = dbConn.createStatement();
-       String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' 
where cq_id = " + cq_id;
-       LOG.debug("Going to execute update <" + s + ">");
-       if (stmt.executeUpdate(s) != 1) {
-         LOG.error("Unable to update compaction record");
-         LOG.debug("Going to rollback");
-         dbConn.rollback();
-       }
-       LOG.debug("Going to commit");
-       dbConn.commit();
-     } catch (SQLException e) {
-       LOG.error("Unable to update compaction queue, " + e.getMessage());
-       try {
-         LOG.debug("Going to rollback");
-         dbConn.rollback();
-       } catch (SQLException e1) {
-       }
-       detectDeadlock(dbConn, e, "setRunAs");
-     } finally {
-       closeDbConn(dbConn);
-       closeStmt(stmt);
-     }
-    } catch (DeadlockException e) {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' 
where cq_id = " + cq_id;
+        LOG.debug("Going to execute update <" + s + ">");
+        if (stmt.executeUpdate(s) != 1) {
+          LOG.error("Unable to update compaction record");
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to update compaction queue, " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "setRunAs");
+      } finally {
+        closeDbConn(dbConn);
+        closeStmt(stmt);
+      }
+    } catch (RetryException e) {
       setRunAs(cq_id, user);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -147,14 +150,15 @@ public class CompactionTxnHandler extend
    */
   public CompactionInfo findNextToCompact(String workerId) throws 
MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       CompactionInfo info = new CompactionInfo();
 
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-            "cq_type from COMPACTION_QUEUE where cq_state = '" + 
INITIATED_STATE + "'";
+          "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE 
+ "'";
         LOG.debug("Going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -175,7 +179,7 @@ public class CompactionTxnHandler extend
         // Now, update this record as being worked on by this worker.
         long now = getDbTime(dbConn);
         s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
-            "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where 
cq_id = " + info.id;
+          "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where 
cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         if (stmt.executeUpdate(s) != 1) {
           LOG.error("Unable to update compaction record");
@@ -187,38 +191,34 @@ public class CompactionTxnHandler extend
         return info;
       } catch (SQLException e) {
         LOG.error("Unable to select next element for compaction, " + 
e.getMessage());
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "findNextToCompact");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findNextToCompact");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return findNextToCompact(workerId);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   /**
    * This will mark an entry in the queue as compacted
    * and put it in the ready to clean state.
-   * @param info info on the compaciton entry to mark as compacted.
+   * @param info info on the compaction entry to mark as compacted.
    */
   public void markCompacted(CompactionInfo info) throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_state = '" + 
READY_FOR_CLEANING + "', " +
-            "cq_worker_id = null where cq_id = " + info.id;
+          "cq_worker_id = null where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         if (stmt.executeUpdate(s) != 1) {
           LOG.error("Unable to update compaction record");
@@ -228,23 +228,18 @@ public class CompactionTxnHandler extend
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.error("Unable to update compaction queue " + e.getMessage());
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "markCompacted");
+        LOG.error("Unable to update compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "markCompacted");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       markCompacted(info);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -254,45 +249,48 @@ public class CompactionTxnHandler extend
    * @return information on the entry in the queue.
    */
   public List<CompactionInfo> findReadyToClean() throws MetaException {
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Connection dbConn = null;
     List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
 
     Statement stmt = null;
     try {
-      stmt = dbConn.createStatement();
-      String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
           "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + 
READY_FOR_CLEANING + "'";
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        CompactionInfo info = new CompactionInfo();
-        info.id = rs.getLong(1);
-        info.dbname = rs.getString(2);
-        info.tableName = rs.getString(3);
-        info.partName = rs.getString(4);
-        switch (rs.getString(5).charAt(0)) {
-          case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
-          case MINOR_TYPE: info.type = CompactionType.MINOR; break;
-          default: throw new MetaException("Unexpected compaction type " + 
rs.getString(5));
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.id = rs.getLong(1);
+          info.dbname = rs.getString(2);
+          info.tableName = rs.getString(3);
+          info.partName = rs.getString(4);
+          switch (rs.getString(5).charAt(0)) {
+            case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+            case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+            default: throw new MetaException("Unexpected compaction type " + 
rs.getString(5));
+          }
+          info.runAs = rs.getString(6);
+          rc.add(info);
         }
-        info.runAs = rs.getString(6);
-        rc.add(info);
-      }
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      return rc;
-    } catch (SQLException e) {
-      LOG.error("Unable to select next element for cleaning, " + 
e.getMessage());
-      try {
         LOG.debug("Going to rollback");
         dbConn.rollback();
-      } catch (SQLException e1) {
-      }
-      throw new MetaException("Unable to connect to transaction database " +
+        return rc;
+      } catch (SQLException e) {
+        LOG.error("Unable to select next element for cleaning, " + 
e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findReadyToClean");
+        throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
-    } finally {
-      closeDbConn(dbConn);
-      closeStmt(stmt);
+      } finally {
+        closeDbConn(dbConn);
+        closeStmt(stmt);
+      }
+    } catch (RetryException e) {
+      return findReadyToClean();
     }
   }
 
@@ -303,9 +301,10 @@ public class CompactionTxnHandler extend
    */
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
@@ -318,20 +317,20 @@ public class CompactionTxnHandler extend
         // Remove entries from completed_txn_components as well, so we don't 
start looking there
         // again.
         s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + 
info.dbname + "' and " +
-            "ctc_table = '" + info.tableName + "'";
+          "ctc_table = '" + info.tableName + "'";
         if (info.partName != null) {
           s += " and ctc_partition = '" + info.partName + "'";
         }
         LOG.debug("Going to execute update <" + s + ">");
         if (stmt.executeUpdate(s) < 1) {
           LOG.error("Expected to remove at least one row from 
completed_txn_components when " +
-              "marking compaction entry as clean!");
+            "marking compaction entry as clean!");
         }
 
 
         s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid 
and txn_state = '" +
-            TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and 
tc_table = '" +
-            info.tableName + "'";
+          TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and 
tc_table = '" +
+          info.tableName + "'";
         if (info.partName != null) s += " and tc_partition = '" + 
info.partName + "'";
         LOG.debug("Going to execute update <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
@@ -371,23 +370,18 @@ public class CompactionTxnHandler extend
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.error("Unable to delete from compaction queue " + 
e.getMessage());
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "markCleaned");
+        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "markCleaned");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       markCleaned(info);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -396,13 +390,14 @@ public class CompactionTxnHandler extend
    */
   public void cleanEmptyAbortedTxns() throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "select txn_id from TXNS where " +
-            "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
-            "txn_state = '" + TXN_ABORTED + "'";
+          "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
+          "txn_state = '" + TXN_ABORTED + "'";
         LOG.debug("Going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         Set<Long> txnids = new HashSet<Long>();
@@ -425,21 +420,16 @@ public class CompactionTxnHandler extend
       } catch (SQLException e) {
         LOG.error("Unable to delete from txns table " + e.getMessage());
         LOG.debug("Going to rollback");
-        try {
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       cleanEmptyAbortedTxns();
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -454,13 +444,14 @@ public class CompactionTxnHandler extend
    */
   public void revokeFromLocalWorkers(String hostname) throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
-            + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_worker_id like '"
-            +  hostname + "%'";
+          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_worker_id like '"
+          +  hostname + "%'";
         LOG.debug("Going to execute update <" + s + ">");
         // It isn't an error if the following returns no rows, as the local 
workers could have died
         // with  nothing assigned to them.
@@ -468,24 +459,19 @@ public class CompactionTxnHandler extend
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.error("Unable to change dead worker's records back to initiated 
state " +
-              e.getMessage());
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "revokeFromLocalWorkers");
+        LOG.error("Unable to change dead worker's records back to initiated 
state " +
+          e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "revokeFromLocalWorkers");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       revokeFromLocalWorkers(hostname);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -500,14 +486,15 @@ public class CompactionTxnHandler extend
    */
   public void revokeTimedoutWorkers(long timeout) throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-      long latestValidStart = getDbTime(dbConn) - timeout;
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        long latestValidStart = getDbTime(dbConn) - timeout;
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
-            + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_start < "
-            +  latestValidStart;
+          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_start < "
+          +  latestValidStart;
         LOG.debug("Going to execute update <" + s + ">");
         // It isn't an error if the following returns no rows, as the local 
workers could have died
         // with  nothing assigned to them.
@@ -515,24 +502,19 @@ public class CompactionTxnHandler extend
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.error("Unable to change dead worker's records back to initiated 
state " +
-              e.getMessage());
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "revokeTimedoutWorkers");
+        LOG.error("Unable to change dead worker's records back to initiated 
state " +
+          e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "revokeTimedoutWorkers");
         throw new MetaException("Unable to connect to transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
         closeStmt(stmt);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       revokeTimedoutWorkers(timeout);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
@@ -543,53 +525,55 @@ public class CompactionTxnHandler extend
    * @throws MetaException
    */
   public List<String> findColumnsWithStats(CompactionInfo ci) throws 
MetaException {
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Connection dbConn = null;
     Statement stmt = null;
     ResultSet rs = null;
     try {
-      String quote = getIdentifierQuoteString(dbConn);
-      stmt = dbConn.createStatement();
-      StringBuilder bldr = new StringBuilder();
-      bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String quote = getIdentifierQuoteString(dbConn);
+        stmt = dbConn.createStatement();
+        StringBuilder bldr = new StringBuilder();
+        bldr.append("SELECT 
").append(quote).append("COLUMN_NAME").append(quote)
           .append(" FROM ")
           .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : 
"PART_COL_STATS"))
-              .append(quote)
+          .append(quote)
           .append(" WHERE ")
           .append(quote).append("DB_NAME").append(quote).append(" = 
'").append(ci.dbname)
-              .append("' AND 
").append(quote).append("TABLE_NAME").append(quote)
-              .append(" = '").append(ci.tableName).append("'");
-      if (ci.partName != null) {
-        bldr.append(" AND 
").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
+          .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
+          .append(" = '").append(ci.tableName).append("'");
+        if (ci.partName != null) {
+          bldr.append(" AND 
").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
             .append(ci.partName).append("'");
-      }
-      String s = bldr.toString();
+        }
+        String s = bldr.toString();
 
       /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? 
"TAB_COL_STATS" :
           "PART_COL_STATS")
          + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + 
ci.tableName + "'"
         + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + 
"'");*/
-      LOG.debug("Going to execute <" + s + ">");
-      rs = stmt.executeQuery(s);
-      List<String> columns = new ArrayList<String>();
-      while(rs.next()) {
-        columns.add(rs.getString(1));
-      }
-      LOG.debug("Found columns to update stats: " + columns + " on " + 
ci.tableName +
-        (ci.partName == null ? "" : "/" + ci.partName));
-      dbConn.commit();
-      return columns;
-    } catch (SQLException e) {
-      try {
+        LOG.debug("Going to execute <" + s + ">");
+        rs = stmt.executeQuery(s);
+        List<String> columns = new ArrayList<String>();
+        while (rs.next()) {
+          columns.add(rs.getString(1));
+        }
+        LOG.debug("Found columns to update stats: " + columns + " on " + 
ci.tableName +
+          (ci.partName == null ? "" : "/" + ci.partName));
+        dbConn.commit();
+        return columns;
+      } catch (SQLException e) {
         LOG.error("Failed to find columns to analyze stats on for " + 
ci.tableName +
-            (ci.partName == null ? "" : "/" + ci.partName), e);
-        dbConn.rollback();
-      } catch (SQLException e1) {
-        //nothing we can do here
+          (ci.partName == null ? "" : "/" + ci.partName), e);
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findColumnsWithStats");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
       }
-      throw new MetaException("Unable to connect to transaction database " +
-        StringUtils.stringifyException(e));
-    } finally {
-      close(rs, stmt, dbConn);
+    } catch (RetryException ex) {
+      return findColumnsWithStats(ci);
     }
   }
 }


Reply via email to