http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
deleted file mode 100644
index 60839fa..0000000
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ /dev/null
@@ -1,960 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Extends the transaction handler with methods needed only by the compactor 
threads.  These
- * methods are not available through the thrift interface.
- */
-class CompactionTxnHandler extends TxnHandler {
-  static final private String CLASS_NAME = 
CompactionTxnHandler.class.getName();
-  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
-  public CompactionTxnHandler() {
-  }
-
-  /**
-   * This will look through the completed_txn_components table and look for 
partitions or tables
-   * that may be ready for compaction.  Also, look through txns and 
txn_components tables for
-   * aborted transactions that we should add to the list.
-   * @param maxAborted Maximum number of aborted queries to allow before 
marking this as a
-   *                   potential compaction.
-   * @return list of CompactionInfo structs.  These will not have id, type,
-   * or runAs set since these are only potential compactions not actual ones.
-   */
-  @Override
-  @RetrySemantics.ReadOnly
-  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException {
-    Connection dbConn = null;
-    Set<CompactionInfo> response = new HashSet<CompactionInfo>();
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        // Check for completed transactions
-        String s = "select distinct ctc_database, ctc_table, " +
-          "ctc_partition from COMPLETED_TXN_COMPONENTS";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          CompactionInfo info = new CompactionInfo();
-          info.dbname = rs.getString(1);
-          info.tableName = rs.getString(2);
-          info.partName = rs.getString(3);
-          response.add(info);
-        }
-        rs.close();
-
-        // Check for aborted txns
-        s = "select tc_database, tc_table, tc_partition " +
-          "from TXNS, TXN_COMPONENTS " +
-          "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
-          "group by tc_database, tc_table, tc_partition " +
-          "having count(*) > " + maxAborted;
-
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          CompactionInfo info = new CompactionInfo();
-          info.dbname = rs.getString(1);
-          info.tableName = rs.getString(2);
-          info.partName = rs.getString(3);
-          info.tooManyAborts = true;
-          response.add(info);
-        }
-
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-      } catch (SQLException e) {
-        LOG.error("Unable to connect to transaction database " + 
e.getMessage());
-        checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + 
maxAborted + ")");
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-      return response;
-    }
-    catch (RetryException e) {
-      return findPotentialCompactions(maxAborted);
-    }
-  }
-
-  /**
-   * Sets the user to run as.  This is for the case
-   * where the request was generated by the user and so the worker must set 
this value later.
-   * @param cq_id id of this entry in the queue
-   * @param user user to run the jobs as
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void setRunAs(long cq_id, String user) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' 
where cq_id = " + cq_id;
-        LOG.debug("Going to execute update <" + s + ">");
-        int updCnt = stmt.executeUpdate(s);
-        if (updCnt != 1) {
-          LOG.error("Unable to set cq_run_as=" + user + " for compaction 
record with cq_id=" + cq_id + ".  updCnt=" + updCnt);
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to update compaction queue, " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user 
+")");
-      } finally {
-        closeDbConn(dbConn);
-        closeStmt(stmt);
-      }
-    } catch (RetryException e) {
-      setRunAs(cq_id, user);
-    }
-  }
-
-  /**
-   * This will grab the next compaction request off of
-   * the queue, and assign it to the worker.
-   * @param workerId id of the worker calling this, will be recorded in the db
-   * @return an info element for this compaction request, or null if there is 
no work to do now.
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public CompactionInfo findNextToCompact(String workerId) throws 
MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      //need a separate stmt for executeUpdate() otherwise it will close the 
ResultSet(HIVE-12725)
-      Statement updStmt = null;
-      ResultSet rs = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-          "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" 
+ INITIATED_STATE + "'";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.debug("No compactions found ready to compact");
-          dbConn.rollback();
-          return null;
-        }
-        updStmt = dbConn.createStatement();
-        do {
-          CompactionInfo info = new CompactionInfo();
-          info.id = rs.getLong(1);
-          info.dbname = rs.getString(2);
-          info.tableName = rs.getString(3);
-          info.partName = rs.getString(4);
-          info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
-          info.properties = rs.getString(6);
-          // Now, update this record as being worked on by this worker.
-          long now = getDbTime(dbConn);
-          s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', 
" +
-            "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where 
cq_id = " + info.id +
-            " AND cq_state='" + INITIATED_STATE + "'";
-          LOG.debug("Going to execute update <" + s + ">");
-          int updCount = updStmt.executeUpdate(s);
-          if(updCount == 1) {
-            dbConn.commit();
-            return info;
-          }
-          if(updCount == 0) {
-            LOG.debug("Another Worker picked up " + info);
-            continue;
-          }
-          LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for 
compaction record: " +
-            info + ". updCnt=" + updCount + ".");
-          dbConn.rollback();
-          return null;
-        } while( rs.next());
-        dbConn.rollback();
-        return null;
-      } catch (SQLException e) {
-        LOG.error("Unable to select next element for compaction, " + 
e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + 
")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(updStmt);
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return findNextToCompact(workerId);
-    }
-  }
-
-  /**
-   * This will mark an entry in the queue as compacted
-   * and put it in the ready to clean state.
-   * @param info info on the compaction entry to mark as compacted.
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void markCompacted(CompactionInfo info) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "update COMPACTION_QUEUE set cq_state = '" + 
READY_FOR_CLEANING + "', " +
-          "cq_worker_id = null where cq_id = " + info.id;
-        LOG.debug("Going to execute update <" + s + ">");
-        int updCnt = stmt.executeUpdate(s);
-        if (updCnt != 1) {
-          LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for 
compaction record: " + info + ". updCnt=" + updCnt);
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to update compaction queue " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "markCompacted(" + info + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      markCompacted(info);
-    }
-  }
-
-  /**
-   * Find entries in the queue that are ready to
-   * be cleaned.
-   * @return information on the entry in the queue.
-   */
-  @Override
-  @RetrySemantics.ReadOnly
-  public List<CompactionInfo> findReadyToClean() throws MetaException {
-    Connection dbConn = null;
-    List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
-
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-          "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where 
cq_state = '" + READY_FOR_CLEANING + "'";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          CompactionInfo info = new CompactionInfo();
-          info.id = rs.getLong(1);
-          info.dbname = rs.getString(2);
-          info.tableName = rs.getString(3);
-          info.partName = rs.getString(4);
-          switch (rs.getString(5).charAt(0)) {
-            case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
-            case MINOR_TYPE: info.type = CompactionType.MINOR; break;
-            default: throw new MetaException("Unexpected compaction type " + 
rs.getString(5));
-          }
-          info.runAs = rs.getString(6);
-          info.highestTxnId = rs.getLong(7);
-          rc.add(info);
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        return rc;
-      } catch (SQLException e) {
-        LOG.error("Unable to select next element for cleaning, " + 
e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "findReadyToClean");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return findReadyToClean();
-    }
-  }
-
-  /**
-   * This will remove an entry from the queue after
-   * it has been compacted.
-   * 
-   * @param info info on the compaction entry to remove
-   */
-  @Override
-  @RetrySemantics.CannotRetry
-  public void markCleaned(CompactionInfo info) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      PreparedStatement pStmt = null;
-      ResultSet rs = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
-        if(rs.next()) {
-          info = CompactionInfo.loadFullFromCompactionQueue(rs);
-        }
-        else {
-          throw new IllegalStateException("No record with CQ_ID=" + info.id + 
" found in COMPACTION_QUEUE");
-        }
-        close(rs);
-        String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
-        LOG.debug("Going to execute update <" + s + ">");
-        int updCount = stmt.executeUpdate(s);
-        if (updCount != 1) {
-          LOG.error("Unable to delete compaction record: " + info +  ".  
Update count=" + updCount);
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        }
-        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, 
?,?,?,?)");
-        info.state = SUCCEEDED_STATE;
-        CompactionInfo.insertIntoCompletedCompactions(pStmt, info, 
getDbTime(dbConn));
-        updCount = pStmt.executeUpdate();
-
-        // Remove entries from completed_txn_components as well, so we don't 
start looking there
-        // again but only up to the highest txn ID include in this compaction 
job.
-        //highestTxnId will be NULL in upgrade scenarios
-        s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + 
info.dbname + "' and " +
-          "ctc_table = '" + info.tableName + "'";
-        if (info.partName != null) {
-          s += " and ctc_partition = '" + info.partName + "'";
-        }
-        if(info.highestTxnId != 0) {
-          s += " and ctc_txnid <= " + info.highestTxnId;
-        }
-        LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) < 1) {
-          LOG.error("Expected to remove at least one row from 
completed_txn_components when " +
-            "marking compaction entry as clean!");
-        }
-
-        s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = 
tc_txnid and txn_state = '" +
-          TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and 
tc_table = '" +
-          info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id 
<= " + info.highestTxnId);
-        if (info.partName != null) s += " and tc_partition = '" + 
info.partName + "'";
-        LOG.debug("Going to execute update <" + s + ">");
-        rs = stmt.executeQuery(s);
-        List<Long> txnids = new ArrayList<>();
-        while (rs.next()) txnids.add(rs.getLong(1));
-        // Remove entries from txn_components, as there may be aborted txn 
components
-        if (txnids.size() > 0) {
-          List<String> queries = new ArrayList<String>();
-
-          // Prepare prefix and suffix
-          StringBuilder prefix = new StringBuilder();
-          StringBuilder suffix = new StringBuilder();
-
-          prefix.append("delete from TXN_COMPONENTS where ");
-
-          //because 1 txn may include different partitions/tables even in auto 
commit mode
-          suffix.append(" and tc_database = ");
-          suffix.append(quoteString(info.dbname));
-          suffix.append(" and tc_table = ");
-          suffix.append(quoteString(info.tableName));
-          if (info.partName != null) {
-            suffix.append(" and tc_partition = ");
-            suffix.append(quoteString(info.partName));
-          }
-
-          // Populate the complete query with provided prefix and suffix
-          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "tc_txnid", true, false);
-
-          for (String query : queries) {
-            LOG.debug("Going to execute update <" + query + ">");
-            int rc = stmt.executeUpdate(query);
-            LOG.debug("Removed " + rc + " records from txn_components");
-
-            // Don't bother cleaning from the txns table.  A separate call 
will do that.  We don't
-            // know here which txns still have components from other tables or 
partitions in the
-            // table, so we don't know which ones we can and cannot clean.
-          }
-        }
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "markCleaned(" + info + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(pStmt);
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      markCleaned(info);
-    }
-  }
-
-  /**
-   * Clean up aborted transactions from txns that have no components in 
txn_components.  The reason such
-   * txns exist can be that now work was done in this txn (e.g. Streaming 
opened TransactionBatch and
-   * abandoned it w/o doing any work) or due to {@link 
#markCleaned(CompactionInfo)} being called.
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void cleanEmptyAbortedTxns() throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        //Aborted is a terminal state, so nothing about the txn can change
-        //after that, so READ COMMITTED is sufficient.
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select txn_id from TXNS where " +
-          "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
-          "txn_state = '" + TXN_ABORTED + "'";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        List<Long> txnids = new ArrayList<>();
-        while (rs.next()) txnids.add(rs.getLong(1));
-        close(rs);
-        if(txnids.size() <= 0) {
-          return;
-        }
-        Collections.sort(txnids);//easier to read logs
-        List<String> queries = new ArrayList<String>();
-        StringBuilder prefix = new StringBuilder();
-        StringBuilder suffix = new StringBuilder();
-
-        prefix.append("delete from TXNS where ");
-        suffix.append("");
-
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, 
"txn_id", false, false);
-
-        for (String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          int rc = stmt.executeUpdate(query);
-          LOG.info("Removed " + rc + "  empty Aborted transactions from TXNS");
-        }
-        LOG.info("Aborted transactions removed from TXNS: " + txnids);
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to delete from txns table " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      cleanEmptyAbortedTxns();
-    }
-  }
-
-  /**
-   * This will take all entries assigned to workers
-   * on a host return them to INITIATED state.  The initiator should use this 
at start up to
-   * clean entries from any workers that were in the middle of compacting when 
the metastore
-   * shutdown.  It does not reset entries from worker threads on other hosts 
as those may still
-   * be working.
-   * @param hostname Name of this host.  It is assumed this prefixes the 
thread's worker id,
-   *                 so that like hostname% will match the worker id.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void revokeFromLocalWorkers(String hostname) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
-          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_worker_id like '"
-          +  hostname + "%'";
-        LOG.debug("Going to execute update <" + s + ">");
-        // It isn't an error if the following returns no rows, as the local 
workers could have died
-        // with  nothing assigned to them.
-        stmt.executeUpdate(s);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to change dead worker's records back to initiated 
state " +
-          e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + 
hostname +")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      revokeFromLocalWorkers(hostname);
-    }
-  }
-
-  /**
-   * This call will return all compaction queue
-   * entries assigned to a worker but over the timeout back to the initiated 
state.
-   * This should be called by the initiator on start up and occasionally when 
running to clean up
-   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} 
should be called
-   * first.
-   * @param timeout number of milliseconds since start time that should elapse 
before a worker is
-   *                declared dead.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void revokeTimedoutWorkers(long timeout) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        long latestValidStart = getDbTime(dbConn) - timeout;
-        stmt = dbConn.createStatement();
-        String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
-          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_start < "
-          +  latestValidStart;
-        LOG.debug("Going to execute update <" + s + ">");
-        // It isn't an error if the following returns no rows, as the local 
workers could have died
-        // with  nothing assigned to them.
-        stmt.executeUpdate(s);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to change dead worker's records back to initiated 
state " +
-          e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + 
")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      revokeTimedoutWorkers(timeout);
-    }
-  }
-
-  /**
-   * Queries metastore DB directly to find columns in the table which have 
statistics information.
-   * If {@code ci} includes partition info then per partition stats info is 
examined, otherwise
-   * table level stats are examined.
-   * @throws MetaException
-   */
-  @Override
-  @RetrySemantics.ReadOnly
-  public List<String> findColumnsWithStats(CompactionInfo ci) throws 
MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        String quote = getIdentifierQuoteString(dbConn);
-        stmt = dbConn.createStatement();
-        StringBuilder bldr = new StringBuilder();
-        bldr.append("SELECT 
").append(quote).append("COLUMN_NAME").append(quote)
-          .append(" FROM ")
-          .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : 
"PART_COL_STATS"))
-          .append(quote)
-          .append(" WHERE ")
-          .append(quote).append("DB_NAME").append(quote).append(" = 
'").append(ci.dbname)
-          .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
-          .append(" = '").append(ci.tableName).append("'");
-        if (ci.partName != null) {
-          bldr.append(" AND 
").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
-            .append(ci.partName).append("'");
-        }
-        String s = bldr.toString();
-
-      /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? 
"TAB_COL_STATS" :
-          "PART_COL_STATS")
-         + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + 
ci.tableName + "'"
-        + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + 
"'");*/
-        LOG.debug("Going to execute <" + s + ">");
-        rs = stmt.executeQuery(s);
-        List<String> columns = new ArrayList<String>();
-        while (rs.next()) {
-          columns.add(rs.getString(1));
-        }
-        LOG.debug("Found columns to update stats: " + columns + " on " + 
ci.tableName +
-          (ci.partName == null ? "" : "/" + ci.partName));
-        dbConn.commit();
-        return columns;
-      } catch (SQLException e) {
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName +
-          (ci.partName == null ? "" : "/" + ci.partName) + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException ex) {
-      return findColumnsWithStats(ci);
-    }
-  }
-
-  /**
-   * Record the highest txn id that the {@code ci} compaction job will pay 
attention to.
-   * This is the highest resolved txn id, i.e. such that there are no open 
txns with lower ids.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) 
throws MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET 
CQ_HIGHEST_TXN_ID = " + highestTxnId +
-          " WHERE CQ_ID = " + ci.id);
-        if(updCount != 1) {
-          throw new IllegalStateException("Could not find record in 
COMPACTION_QUEUE for " + ci);
-        }
-        dbConn.commit();
-      } catch (SQLException e) {
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + 
highestTxnId + ")");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException ex) {
-      setCompactionHighestTxnId(ci, highestTxnId);
-    }
-  }
-  private static class RetentionCounters {
-    int attemptedRetention = 0;
-    int failedRetention = 0;
-    int succeededRetention = 0;
-    RetentionCounters(int attemptedRetention, int failedRetention, int 
succeededRetention) {
-      this.attemptedRetention = attemptedRetention;
-      this.failedRetention = failedRetention;
-      this.succeededRetention = succeededRetention;
-    }
-  }
-  private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, 
RetentionCounters rc) {
-    switch (ci.state) {
-      case ATTEMPTED_STATE:
-        if(--rc.attemptedRetention < 0) {
-          deleteSet.add(ci.id);
-        }
-        break;
-      case FAILED_STATE:
-        if(--rc.failedRetention < 0) {
-          deleteSet.add(ci.id);
-        }
-        break;
-      case SUCCEEDED_STATE:
-        if(--rc.succeededRetention < 0) {
-          deleteSet.add(ci.id);
-        }
-        break;
-      default:
-        //do nothing to hanlde future RU/D where we may want to add new state 
types
-    }
-  }
-
-  /**
-   * For any given compactable entity (partition; table if not partitioned) 
the history of compactions
-   * may look like "sssfffaaasffss", for example.  The idea is to retain the 
tail (most recent) of the
-   * history such that a configurable number of each type of state is present. 
 Any other entries
-   * can be purged.  This scheme has advantage of always retaining the last 
failure/success even if
-   * it's not recent.
-   * @throws MetaException
-   */
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void purgeCompactionHistory() throws MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    List<Long> deleteSet = new ArrayList<>();
-    RetentionCounters rc = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        /*cc_id is monotonically increasing so for any entity sorts in order 
of compaction history,
-        thus this query groups by entity and withing group sorts most recent 
first*/
-        rs = stmt.executeQuery("select cc_id, cc_database, cc_table, 
cc_partition, cc_state from " +
-          "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, 
cc_id desc");
-        String lastCompactedEntity = null;
-        /*In each group, walk from most recent and count occurences of each 
state type.  Once you
-        * have counted enough (for each state) to satisfy retention policy, 
delete all other
-        * instances of this status.*/
-        while(rs.next()) {
-          CompactionInfo ci = new CompactionInfo(rs.getLong(1), 
rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
-          if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
-            lastCompactedEntity = ci.getFullPartitionName();
-            rc = new 
RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
-              getFailedCompactionRetention(),
-              
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
-          }
-          checkForDeletion(deleteSet, ci, rc);
-        }
-        close(rs);
-
-        if (deleteSet.size() <= 0) {
-          return;
-        }
-
-        List<String> queries = new ArrayList<String>();
-
-        StringBuilder prefix = new StringBuilder();
-        StringBuilder suffix = new StringBuilder();
-
-        prefix.append("delete from COMPLETED_COMPACTIONS where ");
-        suffix.append("");
-
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
deleteSet, "cc_id", false, false);
-
-        for (String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          int count = stmt.executeUpdate(query);
-          LOG.debug("Removed " + count + " records from 
COMPLETED_COMPACTIONS");
-        }
-        dbConn.commit();
-      } catch (SQLException e) {
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "purgeCompactionHistory()");
-        throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException ex) {
-      purgeCompactionHistory();
-    }
-  }
-  /**
-   * this ensures that the number of failed compaction entries retained is > 
than number of failed
-   * compaction threshold which prevents new compactions from being scheduled.
-   */
-  private int getFailedCompactionRetention() {
-    int failedThreshold = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-    int failedRetention = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
-    if(failedRetention < failedThreshold) {
-      LOG.warn("Invalid configuration " + 
HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
-        "=" + failedRetention + " < " + 
HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
-        failedRetention + ".  Will use " + 
HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
-        "=" + failedRetention);
-      failedRetention = failedThreshold;
-    }
-    return failedRetention;
-  }
-  /**
-   * Returns {@code true} if there already exists sufficient number of 
consecutive failures for
-   * this table/partition so that no new automatic compactions will be 
scheduled.
-   * User initiated compactions don't do this check.
-   *
-   * Do we allow compacting whole table (when it's partitioned)?  No, though 
perhaps we should.
-   * That would be a meta operations, i.e. first find all partitions for this 
table (which have 
-   * txn info) and schedule each compaction separately.  This avoids 
complications in this logic.
-   */
-  @Override
-  @RetrySemantics.ReadOnly
-  public boolean checkFailedCompactions(CompactionInfo ci) throws 
MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS 
where " +
-          "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
-          "CC_TABLE = " + quoteString(ci.tableName) +
-          (ci.partName != null ? "and CC_PARTITION = " + 
quoteString(ci.partName) : "") +
-          " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID 
desc");
-        int numFailed = 0;
-        int numTotal = 0;
-        int failedThreshold = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-        while(rs.next() && ++numTotal <= failedThreshold) {
-          if(rs.getString(1).charAt(0) == FAILED_STATE) {
-            numFailed++;
-          }
-          else {
-            numFailed--;
-          }
-        }
-        return numFailed == failedThreshold;
-      }
-      catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
-        LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(e));
-        return false;//weren't able to check
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return checkFailedCompactions(ci);
-    }
-  }
-  /**
-   * If there is an entry in compaction_queue with ci.id, remove it
-   * Make entry in completed_compactions with status 'f'.
-   * If there is no entry in compaction_queue, it means Initiator failed to 
even schedule a compaction,
-   * which we record as ATTEMPTED_STATE entry in history.
-   */
-  @Override
-  @RetrySemantics.CannotRetry
-  public void markFailed(CompactionInfo ci) throws MetaException {//todo: this 
should not throw
-    //todo: this should take "comment" as parameter to set in CC_META_INFO to 
provide some context for the failure
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      PreparedStatement pStmt = null;
-      ResultSet rs = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
-        if(rs.next()) {
-          ci = CompactionInfo.loadFullFromCompactionQueue(rs);
-          String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
-          LOG.debug("Going to execute update <" + s + ">");
-          int updCnt = stmt.executeUpdate(s);
-        }
-        else {
-          if(ci.id > 0) {
-            //the record with valid CQ_ID has disappeared - this is a sign of 
something wrong
-            throw new IllegalStateException("No record with CQ_ID=" + ci.id + 
" found in COMPACTION_QUEUE");
-          }
-        }
-        if(ci.id == 0) {
-          //The failure occurred before we even made an entry in 
COMPACTION_QUEUE
-          //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
-          ci.id = generateCompactionQueueId(stmt);
-          //mostly this indicates that the Initiator is paying attention to 
some table even though
-          //compactions are not happening.
-          ci.state = ATTEMPTED_STATE;
-          //this is not strictly accurate, but 'type' cannot be null.
-          if(ci.type == null) { ci.type = CompactionType.MINOR; }
-          ci.start = getDbTime(dbConn);
-        }
-        else {
-          ci.state = FAILED_STATE;
-        }
-        close(rs, stmt, null);
-
-        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, 
?,?,?,?)");
-        CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
-        int updCount = pStmt.executeUpdate();
-        LOG.debug("Going to commit");
-        closeStmt(pStmt);
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        try {
-          checkRetryable(dbConn, e, "markFailed(" + ci + ")");
-        }
-        catch(MetaException ex) {
-          LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(ex));
-        }
-        LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
-      } finally {
-        close(rs, stmt, null);
-        close(null, pStmt, dbConn);
-      }
-    } catch (RetryException e) {
-      markFailed(ci);
-    }
-  }
-  @Override
-  @RetrySemantics.Idempotent
-  public void setHadoopJobId(String hadoopJobId, long id) {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + 
quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
-        LOG.debug("Going to execute <" + s + ">");
-        int updateCount = stmt.executeUpdate(s);
-        LOG.debug("Going to commit");
-        closeStmt(stmt);
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + 
e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        try {
-          checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id 
+ ")");
-        }
-        catch(MetaException ex) {
-          LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(ex));
-        }
-        LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + 
e.getMessage(), e);
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      setHadoopJobId(hadoopJobId, id);
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
deleted file mode 100644
index 0161894..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.SQLTransactionRollbackException;
-import java.sql.Statement;
-import java.util.Properties;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-/**
- * Utility methods for creating and destroying txn database/schema, plus 
methods for
- * querying against metastore tables.
- * Placed here in a separate class so it can be shared across unit tests.
- */
-public final class TxnDbUtil {
-
-  static final private Logger LOG = 
LoggerFactory.getLogger(TxnDbUtil.class.getName());
-  private static final String TXN_MANAGER = 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
-
-  private static int deadlockCnt = 0;
-
-  private TxnDbUtil() {
-    throw new UnsupportedOperationException("Can't initialize class");
-  }
-
-  /**
-   * Set up the configuration so it will use the DbTxnManager, concurrency 
will be set to true,
-   * and the JDBC configs will be set for putting the transaction and lock 
info in the embedded
-   * metastore.
-   *
-   * @param conf HiveConf to add these values to
-   */
-  public static void setConfValues(HiveConf conf) {
-    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
-  }
-
-  public static void prepDb() throws Exception {
-    // This is a bogus hack because it copies the contents of the SQL file
-    // intended for creating derby databases, and thus will inexorably get
-    // out of date with it.  I'm open to any suggestions on how to make this
-    // read the file in a build friendly way.
-
-    Connection conn = null;
-    Statement stmt = null;
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-      stmt.execute("CREATE TABLE TXNS (" +
-          "  TXN_ID bigint PRIMARY KEY," +
-          "  TXN_STATE char(1) NOT NULL," +
-          "  TXN_STARTED bigint NOT NULL," +
-          "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
-          "  TXN_USER varchar(128) NOT NULL," +
-          "  TXN_HOST varchar(128) NOT NULL)");
-
-      stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
-          "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
-          "  TC_DATABASE varchar(128) NOT NULL," +
-          "  TC_TABLE varchar(128)," +
-          "  TC_PARTITION varchar(767)," +
-          "  TC_OPERATION_TYPE char(1) NOT NULL)");
-      stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
-          "  CTC_TXNID bigint," +
-          "  CTC_DATABASE varchar(128) NOT NULL," +
-          "  CTC_TABLE varchar(128)," +
-          "  CTC_PARTITION varchar(767))");
-      stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT 
NULL)");
-      stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
-      stmt.execute("CREATE TABLE HIVE_LOCKS (" +
-          " HL_LOCK_EXT_ID bigint NOT NULL," +
-          " HL_LOCK_INT_ID bigint NOT NULL," +
-          " HL_TXNID bigint," +
-          " HL_DB varchar(128) NOT NULL," +
-          " HL_TABLE varchar(128)," +
-          " HL_PARTITION varchar(767)," +
-          " HL_LOCK_STATE char(1) NOT NULL," +
-          " HL_LOCK_TYPE char(1) NOT NULL," +
-          " HL_LAST_HEARTBEAT bigint NOT NULL," +
-          " HL_ACQUIRED_AT bigint," +
-          " HL_USER varchar(128) NOT NULL," +
-          " HL_HOST varchar(128) NOT NULL," +
-          " HL_HEARTBEAT_COUNT integer," +
-          " HL_AGENT_INFO varchar(128)," +
-          " HL_BLOCKEDBY_EXT_ID bigint," +
-          " HL_BLOCKEDBY_INT_ID bigint," +
-        " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
-      stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
-
-      stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT 
NULL)");
-      stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
-
-      stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
-          " CQ_ID bigint PRIMARY KEY," +
-          " CQ_DATABASE varchar(128) NOT NULL," +
-          " CQ_TABLE varchar(128) NOT NULL," +
-          " CQ_PARTITION varchar(767)," +
-          " CQ_STATE char(1) NOT NULL," +
-          " CQ_TYPE char(1) NOT NULL," +
-          " CQ_TBLPROPERTIES varchar(2048)," +
-          " CQ_WORKER_ID varchar(128)," +
-          " CQ_START bigint," +
-          " CQ_RUN_AS varchar(128)," +
-          " CQ_HIGHEST_TXN_ID bigint," +
-          " CQ_META_INFO varchar(2048) for bit data," +
-          " CQ_HADOOP_JOB_ID varchar(32))");
-
-      stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT 
NULL)");
-      stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
-      
-      stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
-        " CC_ID bigint PRIMARY KEY," +
-        " CC_DATABASE varchar(128) NOT NULL," +
-        " CC_TABLE varchar(128) NOT NULL," +
-        " CC_PARTITION varchar(767)," +
-        " CC_STATE char(1) NOT NULL," +
-        " CC_TYPE char(1) NOT NULL," +
-        " CC_TBLPROPERTIES varchar(2048)," +
-        " CC_WORKER_ID varchar(128)," +
-        " CC_START bigint," +
-        " CC_END bigint," +
-        " CC_RUN_AS varchar(128)," +
-        " CC_HIGHEST_TXN_ID bigint," +
-        " CC_META_INFO varchar(2048) for bit data," +
-        " CC_HADOOP_JOB_ID varchar(32))");
-      
-      stmt.execute("CREATE TABLE AUX_TABLE (" +
-        " MT_KEY1 varchar(128) NOT NULL," +
-        " MT_KEY2 bigint NOT NULL," +
-        " MT_COMMENT varchar(255)," +
-        " PRIMARY KEY(MT_KEY1, MT_KEY2))");
-      
-      stmt.execute("CREATE TABLE WRITE_SET (" +
-        " WS_DATABASE varchar(128) NOT NULL," +
-        " WS_TABLE varchar(128) NOT NULL," +
-        " WS_PARTITION varchar(767)," +
-        " WS_TXNID bigint NOT NULL," +
-        " WS_COMMIT_ID bigint NOT NULL," +
-        " WS_OPERATION_TYPE char(1) NOT NULL)"
-      );
-    } catch (SQLException e) {
-      try {
-        conn.rollback();
-      } catch (SQLException re) {
-        LOG.error("Error rolling back: " + re.getMessage());
-      }
-
-      // This might be a deadlock, if so, let's retry
-      if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
-        LOG.warn("Caught deadlock, retrying db creation");
-        prepDb();
-      } else {
-        throw e;
-      }
-    } finally {
-      deadlockCnt = 0;
-      closeResources(conn, stmt, null);
-    }
-  }
-
-  public static void cleanDb() throws Exception {
-    int retryCount = 0;
-    while(++retryCount <= 3) {
-      boolean success = true;
-      Connection conn = null;
-      Statement stmt = null;
-      try {
-        conn = getConnection();
-        stmt = conn.createStatement();
-
-        // We want to try these, whether they succeed or fail.
-        try {
-          stmt.execute("DROP INDEX HL_TXNID_INDEX");
-        } catch (SQLException e) {
-          if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) {
-            //42X65/3000 means index doesn't exist
-            LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() +
-              "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " 
retryCount=" + retryCount);
-            success = false;
-          }
-        }
-
-        success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
-        success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
-        success &= dropTable(stmt, "TXNS", retryCount);
-        success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
-        success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
-        success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
-        success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
-        success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
-        success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
-        success &= dropTable(stmt, "AUX_TABLE", retryCount);
-        success &= dropTable(stmt, "WRITE_SET", retryCount);
-      } finally {
-        closeResources(conn, stmt, null);
-      }
-      if(success) {
-        return;
-      }
-    }
-  }
-
-  private static boolean dropTable(Statement stmt, String name, int 
retryCount) throws SQLException {
-    try {
-      stmt.execute("DROP TABLE " + name);
-      return true;
-    } catch (SQLException e) {
-      if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
-        //failed because object doesn't exist
-        return true;
-      }
-      LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
-        " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " 
retryCount=" + retryCount);
-    }
-    return false;
-  }
-
-  /**
-   * A tool to count the number of partitions, tables,
-   * and databases locked by a particular lockId.
-   *
-   * @param lockId lock id to look for lock components
-   *
-   * @return number of components, or 0 if there is no lock
-   */
-  public static int countLockComponents(long lockId) throws Exception {
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      conn = getConnection();
-      stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE 
hl_lock_ext_id = ?");
-      stmt.setLong(1, lockId);
-      rs = stmt.executeQuery();
-      if (!rs.next()) {
-        return 0;
-      }
-      return rs.getInt(1);
-    } finally {
-      closeResources(conn, stmt, rs);
-    }
-  }
-
-  /**
-   * Utility method used to run COUNT queries like "select count(*) from ..." 
against metastore tables
-   * @param countQuery countQuery text
-   * @return count countQuery result
-   * @throws Exception
-   */
-  public static int countQueryAgent(String countQuery) throws Exception {
-    Connection conn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-      rs = stmt.executeQuery(countQuery);
-      if (!rs.next()) {
-        return 0;
-      }
-      return rs.getInt(1);
-    } finally {
-      closeResources(conn, stmt, rs);
-    }
-  }
-  @VisibleForTesting
-  public static String queryToString(String query) throws Exception {
-    return queryToString(query, true);
-  }
-  public static String queryToString(String query, boolean includeHeader) 
throws Exception {
-    Connection conn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    StringBuilder sb = new StringBuilder();
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-      rs = stmt.executeQuery(query);
-      ResultSetMetaData rsmd = rs.getMetaData();
-      if(includeHeader) {
-        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
-          sb.append(rsmd.getColumnName(colPos)).append("   ");
-        }
-        sb.append('\n');
-      }
-      while(rs.next()) {
-        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
-          sb.append(rs.getObject(colPos)).append("   ");
-        }
-        sb.append('\n');
-      }
-    } finally {
-      closeResources(conn, stmt, rs);
-    }
-    return sb.toString();
-  }
-
-  static Connection getConnection() throws Exception {
-    HiveConf conf = new HiveConf();
-    String jdbcDriver = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
-    Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
-    Properties prop = new Properties();
-    String driverUrl = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORECONNECTURLKEY);
-    String user = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
-    String passwd = MetastoreConf.getPassword(conf, 
MetastoreConf.ConfVars.PWD);
-    prop.setProperty("user", user);
-    prop.setProperty("password", passwd);
-    Connection conn = driver.connect(driverUrl, prop);
-    conn.setAutoCommit(true);
-    return conn;
-  }
-
-  static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
-    if (rs != null) {
-      try {
-        rs.close();
-      } catch (SQLException e) {
-        LOG.error("Error closing ResultSet: " + e.getMessage());
-      }
-    }
-
-    if (stmt != null) {
-      try {
-        stmt.close();
-      } catch (SQLException e) {
-        System.err.println("Error closing Statement: " + e.getMessage());
-      }
-    }
-
-    if (conn != null) {
-      try {
-        conn.rollback();
-      } catch (SQLException e) {
-        System.err.println("Error rolling back: " + e.getMessage());
-      }
-      try {
-        conn.close();
-      } catch (SQLException e) {
-        System.err.println("Error closing Connection: " + e.getMessage());
-      }
-    }
-  }
-}

Reply via email to