http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java deleted file mode 100644 index 60839fa..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ /dev/null @@ -1,960 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import org.apache.hadoop.hive.common.classification.RetrySemantics; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Extends the transaction handler with methods needed only by the compactor threads. These - * methods are not available through the thrift interface. - */ -class CompactionTxnHandler extends TxnHandler { - static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); - static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - - public CompactionTxnHandler() { - } - - /** - * This will look through the completed_txn_components table and look for partitions or tables - * that may be ready for compaction. Also, look through txns and txn_components tables for - * aborted transactions that we should add to the list. - * @param maxAborted Maximum number of aborted queries to allow before marking this as a - * potential compaction. - * @return list of CompactionInfo structs. These will not have id, type, - * or runAs set since these are only potential compactions not actual ones. - */ - @Override - @RetrySemantics.ReadOnly - public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException { - Connection dbConn = null; - Set<CompactionInfo> response = new HashSet<CompactionInfo>(); - Statement stmt = null; - ResultSet rs = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + - "ctc_partition from COMPLETED_TXN_COMPONENTS"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - response.add(info); - } - rs.close(); - - // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + - "from TXNS, TXN_COMPONENTS " + - "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + - "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + maxAborted; - - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - info.tooManyAborts = true; - response.add(info); - } - - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.error("Unable to connect to transaction database " + e.getMessage()); - checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); - } finally { - close(rs, stmt, dbConn); - } - return response; - } - catch (RetryException e) { - return findPotentialCompactions(maxAborted); - } - } - - /** - * Sets the user to run as. This is for the case - * where the request was generated by the user and so the worker must set this value later. - * @param cq_id id of this entry in the queue - * @param user user to run the jobs as - */ - @Override - @RetrySemantics.Idempotent - public void setRunAs(long cq_id, String user) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; - LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); - if (updCnt != 1) { - LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue, " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")"); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); - } - } catch (RetryException e) { - setRunAs(cq_id, user); - } - } - - /** - * This will grab the next compaction request off of - * the queue, and assign it to the worker. - * @param workerId id of the worker calling this, will be recorded in the db - * @return an info element for this compaction request, or null if there is no work to do now. - */ - @Override - @RetrySemantics.SafeToRetry - public CompactionInfo findNextToCompact(String workerId) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725) - Statement updStmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("No compactions found ready to compact"); - dbConn.rollback(); - return null; - } - updStmt = dbConn.createStatement(); - do { - CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); - info.properties = rs.getString(6); - // Now, update this record as being worked on by this worker. - long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + - " AND cq_state='" + INITIATED_STATE + "'"; - LOG.debug("Going to execute update <" + s + ">"); - int updCount = updStmt.executeUpdate(s); - if(updCount == 1) { - dbConn.commit(); - return info; - } - if(updCount == 0) { - LOG.debug("Another Worker picked up " + info); - continue; - } - LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + - info + ". updCnt=" + updCount + "."); - dbConn.rollback(); - return null; - } while( rs.next()); - dbConn.rollback(); - return null; - } catch (SQLException e) { - LOG.error("Unable to select next element for compaction, " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(updStmt); - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return findNextToCompact(workerId); - } - } - - /** - * This will mark an entry in the queue as compacted - * and put it in the ready to clean state. - * @param info info on the compaction entry to mark as compacted. - */ - @Override - @RetrySemantics.SafeToRetry - public void markCompacted(CompactionInfo info) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); - if (updCnt != 1) { - LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCompacted(" + info + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - } catch (RetryException e) { - markCompacted(info); - } - } - - /** - * Find entries in the queue that are ready to - * be cleaned. - * @return information on the entry in the queue. - */ - @Override - @RetrySemantics.ReadOnly - public List<CompactionInfo> findReadyToClean() throws MetaException { - Connection dbConn = null; - List<CompactionInfo> rc = new ArrayList<CompactionInfo>(); - - Statement stmt = null; - ResultSet rs = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } - info.runAs = rs.getString(6); - info.highestTxnId = rs.getLong(7); - rc.add(info); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return rc; - } catch (SQLException e) { - LOG.error("Unable to select next element for cleaning, " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findReadyToClean"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return findReadyToClean(); - } - } - - /** - * This will remove an entry from the queue after - * it has been compacted. - * - * @param info info on the compaction entry to remove - */ - @Override - @RetrySemantics.CannotRetry - public void markCleaned(CompactionInfo info) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - PreparedStatement pStmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); - if(rs.next()) { - info = CompactionInfo.loadFullFromCompactionQueue(rs); - } - else { - throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); - } - close(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); - if (updCount != 1) { - LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); - info.state = SUCCEEDED_STATE; - CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); - updCount = pStmt.executeUpdate(); - - // Remove entries from completed_txn_components as well, so we don't start looking there - // again but only up to the highest txn ID include in this compaction job. - //highestTxnId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; - if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; - } - if(info.highestTxnId != 0) { - s += " and ctc_txnid <= " + info.highestTxnId; - } - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { - LOG.error("Expected to remove at least one row from completed_txn_components when " + - "marking compaction entry as clean!"); - } - - s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId); - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; - LOG.debug("Going to execute update <" + s + ">"); - rs = stmt.executeQuery(s); - List<Long> txnids = new ArrayList<>(); - while (rs.next()) txnids.add(rs.getLong(1)); - // Remove entries from txn_components, as there may be aborted txn components - if (txnids.size() > 0) { - List<String> queries = new ArrayList<String>(); - - // Prepare prefix and suffix - StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - - prefix.append("delete from TXN_COMPONENTS where "); - - //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = "); - suffix.append(quoteString(info.dbname)); - suffix.append(" and tc_table = "); - suffix.append(quoteString(info.tableName)); - if (info.partName != null) { - suffix.append(" and tc_partition = "); - suffix.append(quoteString(info.partName)); - } - - // Populate the complete query with provided prefix and suffix - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); - - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.debug("Removed " + rc + " records from txn_components"); - - // Don't bother cleaning from the txns table. A separate call will do that. We don't - // know here which txns still have components from other tables or partitions in the - // table, so we don't know which ones we can and cannot clean. - } - } - - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCleaned(" + info + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(pStmt); - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - markCleaned(info); - } - } - - /** - * Clean up aborted transactions from txns that have no components in txn_components. The reason such - * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and - * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. - */ - @Override - @RetrySemantics.SafeToRetry - public void cleanEmptyAbortedTxns() throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - //Aborted is a terminal state, so nothing about the txn can change - //after that, so READ COMMITTED is sufficient. - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - List<Long> txnids = new ArrayList<>(); - while (rs.next()) txnids.add(rs.getLong(1)); - close(rs); - if(txnids.size() <= 0) { - return; - } - Collections.sort(txnids);//easier to read logs - List<String> queries = new ArrayList<String>(); - StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - - prefix.append("delete from TXNS where "); - suffix.append(""); - - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); - - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); - LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); - } - LOG.info("Aborted transactions removed from TXNS: " + txnids); - - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to delete from txns table " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - cleanEmptyAbortedTxns(); - } - } - - /** - * This will take all entries assigned to workers - * on a host return them to INITIATED state. The initiator should use this at start up to - * clean entries from any workers that were in the middle of compacting when the metastore - * shutdown. It does not reset entries from worker threads on other hosts as those may still - * be working. - * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, - * so that like hostname% will match the worker id. - */ - @Override - @RetrySemantics.Idempotent - public void revokeFromLocalWorkers(String hostname) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; - LOG.debug("Going to execute update <" + s + ">"); - // It isn't an error if the following returns no rows, as the local workers could have died - // with nothing assigned to them. - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - } catch (RetryException e) { - revokeFromLocalWorkers(hostname); - } - } - - /** - * This call will return all compaction queue - * entries assigned to a worker but over the timeout back to the initiated state. - * This should be called by the initiator on start up and occasionally when running to clean up - * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called - * first. - * @param timeout number of milliseconds since start time that should elapse before a worker is - * declared dead. - */ - @Override - @RetrySemantics.Idempotent - public void revokeTimedoutWorkers(long timeout) throws MetaException { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - long latestValidStart = getDbTime(dbConn) - timeout; - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; - LOG.debug("Going to execute update <" + s + ">"); - // It isn't an error if the following returns no rows, as the local workers could have died - // with nothing assigned to them. - stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); - } - } catch (RetryException e) { - revokeTimedoutWorkers(timeout); - } - } - - /** - * Queries metastore DB directly to find columns in the table which have statistics information. - * If {@code ci} includes partition info then per partition stats info is examined, otherwise - * table level stats are examined. - * @throws MetaException - */ - @Override - @RetrySemantics.ReadOnly - public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); - StringBuilder bldr = new StringBuilder(); - bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) - .append(" FROM ") - .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) - .append(quote) - .append(" WHERE ") - .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); - if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") - .append(ci.partName).append("'"); - } - String s = bldr.toString(); - - /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : - "PART_COL_STATS") - + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" - + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ - LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); - List<String> columns = new ArrayList<String>(); - while (rs.next()) { - columns.add(rs.getString(1)); - } - LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName)); - dbConn.commit(); - return columns; - } catch (SQLException e) { - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName) + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException ex) { - return findColumnsWithStats(ci); - } - } - - /** - * Record the highest txn id that the {@code ci} compaction job will pay attention to. - * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. - */ - @Override - @RetrySemantics.Idempotent - public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { - Connection dbConn = null; - Statement stmt = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId + - " WHERE CQ_ID = " + ci.id); - if(updCount != 1) { - throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); - } - dbConn.commit(); - } catch (SQLException e) { - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(null, stmt, dbConn); - } - } catch (RetryException ex) { - setCompactionHighestTxnId(ci, highestTxnId); - } - } - private static class RetentionCounters { - int attemptedRetention = 0; - int failedRetention = 0; - int succeededRetention = 0; - RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) { - this.attemptedRetention = attemptedRetention; - this.failedRetention = failedRetention; - this.succeededRetention = succeededRetention; - } - } - private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) { - switch (ci.state) { - case ATTEMPTED_STATE: - if(--rc.attemptedRetention < 0) { - deleteSet.add(ci.id); - } - break; - case FAILED_STATE: - if(--rc.failedRetention < 0) { - deleteSet.add(ci.id); - } - break; - case SUCCEEDED_STATE: - if(--rc.succeededRetention < 0) { - deleteSet.add(ci.id); - } - break; - default: - //do nothing to hanlde future RU/D where we may want to add new state types - } - } - - /** - * For any given compactable entity (partition; table if not partitioned) the history of compactions - * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the - * history such that a configurable number of each type of state is present. Any other entries - * can be purged. This scheme has advantage of always retaining the last failure/success even if - * it's not recent. - * @throws MetaException - */ - @Override - @RetrySemantics.SafeToRetry - public void purgeCompactionHistory() throws MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - List<Long> deleteSet = new ArrayList<>(); - RetentionCounters rc = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, - thus this query groups by entity and withing group sorts most recent first*/ - rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + - "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); - String lastCompactedEntity = null; - /*In each group, walk from most recent and count occurences of each state type. Once you - * have counted enough (for each state) to satisfy retention policy, delete all other - * instances of this status.*/ - while(rs.next()) { - CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0)); - if(!ci.getFullPartitionName().equals(lastCompactedEntity)) { - lastCompactedEntity = ci.getFullPartitionName(); - rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), - getFailedCompactionRetention(), - conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); - } - checkForDeletion(deleteSet, ci, rc); - } - close(rs); - - if (deleteSet.size() <= 0) { - return; - } - - List<String> queries = new ArrayList<String>(); - - StringBuilder prefix = new StringBuilder(); - StringBuilder suffix = new StringBuilder(); - - prefix.append("delete from COMPLETED_COMPACTIONS where "); - suffix.append(""); - - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); - - for (String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int count = stmt.executeUpdate(query); - LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); - } - dbConn.commit(); - } catch (SQLException e) { - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "purgeCompactionHistory()"); - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException ex) { - purgeCompactionHistory(); - } - } - /** - * this ensures that the number of failed compaction entries retained is > than number of failed - * compaction threshold which prevents new compactions from being scheduled. - */ - private int getFailedCompactionRetention() { - int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED); - if(failedRetention < failedThreshold) { - LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname + - "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" + - failedRetention + ". Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname + - "=" + failedRetention); - failedRetention = failedThreshold; - } - return failedRetention; - } - /** - * Returns {@code true} if there already exists sufficient number of consecutive failures for - * this table/partition so that no new automatic compactions will be scheduled. - * User initiated compactions don't do this check. - * - * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should. - * That would be a meta operations, i.e. first find all partitions for this table (which have - * txn info) and schedule each compaction separately. This avoids complications in this logic. - */ - @Override - @RetrySemantics.ReadOnly - public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = " + quoteString(ci.dbname) + " and " + - "CC_TABLE = " + quoteString(ci.tableName) + - (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + - " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); - int numFailed = 0; - int numTotal = 0; - int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); - while(rs.next() && ++numTotal <= failedThreshold) { - if(rs.getString(1).charAt(0) == FAILED_STATE) { - numFailed++; - } - else { - numFailed--; - } - } - return numFailed == failedThreshold; - } - catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); - return false;//weren't able to check - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return checkFailedCompactions(ci); - } - } - /** - * If there is an entry in compaction_queue with ci.id, remove it - * Make entry in completed_compactions with status 'f'. - * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction, - * which we record as ATTEMPTED_STATE entry in history. - */ - @Override - @RetrySemantics.CannotRetry - public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw - //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure - try { - Connection dbConn = null; - Statement stmt = null; - PreparedStatement pStmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); - if(rs.next()) { - ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; - LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); - } - else { - if(ci.id > 0) { - //the record with valid CQ_ID has disappeared - this is a sign of something wrong - throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); - } - } - if(ci.id == 0) { - //The failure occurred before we even made an entry in COMPACTION_QUEUE - //generate ID so that we can make an entry in COMPLETED_COMPACTIONS - ci.id = generateCompactionQueueId(stmt); - //mostly this indicates that the Initiator is paying attention to some table even though - //compactions are not happening. - ci.state = ATTEMPTED_STATE; - //this is not strictly accurate, but 'type' cannot be null. - if(ci.type == null) { ci.type = CompactionType.MINOR; } - ci.start = getDbTime(dbConn); - } - else { - ci.state = FAILED_STATE; - } - close(rs, stmt, null); - - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); - CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); - int updCount = pStmt.executeUpdate(); - LOG.debug("Going to commit"); - closeStmt(pStmt); - dbConn.commit(); - } catch (SQLException e) { - LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "markFailed(" + ci + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } - LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); - } finally { - close(rs, stmt, null); - close(null, pStmt, dbConn); - } - } catch (RetryException e) { - markFailed(ci); - } - } - @Override - @RetrySemantics.Idempotent - public void setHadoopJobId(String hadoopJobId, long id) { - try { - Connection dbConn = null; - Statement stmt = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; - LOG.debug("Going to execute <" + s + ">"); - int updateCount = stmt.executeUpdate(s); - LOG.debug("Going to commit"); - closeStmt(stmt); - dbConn.commit(); - } catch (SQLException e) { - LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } - LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); - } finally { - close(null, stmt, dbConn); - } - } catch (RetryException e) { - setHadoopJobId(hadoopJobId, id); - } - } -} - -
http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java deleted file mode 100644 index 0161894..0000000 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn; - -import java.sql.Connection; -import java.sql.Driver; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.SQLTransactionRollbackException; -import java.sql.Statement; -import java.util.Properties; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; - -/** - * Utility methods for creating and destroying txn database/schema, plus methods for - * querying against metastore tables. - * Placed here in a separate class so it can be shared across unit tests. - */ -public final class TxnDbUtil { - - static final private Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); - private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; - - private static int deadlockCnt = 0; - - private TxnDbUtil() { - throw new UnsupportedOperationException("Can't initialize class"); - } - - /** - * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true, - * and the JDBC configs will be set for putting the transaction and lock info in the embedded - * metastore. - * - * @param conf HiveConf to add these values to - */ - public static void setConfValues(HiveConf conf) { - conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); - } - - public static void prepDb() throws Exception { - // This is a bogus hack because it copies the contents of the SQL file - // intended for creating derby databases, and thus will inexorably get - // out of date with it. I'm open to any suggestions on how to make this - // read the file in a build friendly way. - - Connection conn = null; - Statement stmt = null; - try { - conn = getConnection(); - stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + - " TXN_STATE char(1) NOT NULL," + - " TXN_STARTED bigint NOT NULL," + - " TXN_LAST_HEARTBEAT bigint NOT NULL," + - " TXN_USER varchar(128) NOT NULL," + - " TXN_HOST varchar(128) NOT NULL)"); - - stmt.execute("CREATE TABLE TXN_COMPONENTS (" + - " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + - " TC_DATABASE varchar(128) NOT NULL," + - " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767)," + - " TC_OPERATION_TYPE char(1) NOT NULL)"); - stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + - " CTC_TXNID bigint," + - " CTC_DATABASE varchar(128) NOT NULL," + - " CTC_TABLE varchar(128)," + - " CTC_PARTITION varchar(767))"); - stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); - stmt.execute("CREATE TABLE HIVE_LOCKS (" + - " HL_LOCK_EXT_ID bigint NOT NULL," + - " HL_LOCK_INT_ID bigint NOT NULL," + - " HL_TXNID bigint," + - " HL_DB varchar(128) NOT NULL," + - " HL_TABLE varchar(128)," + - " HL_PARTITION varchar(767)," + - " HL_LOCK_STATE char(1) NOT NULL," + - " HL_LOCK_TYPE char(1) NOT NULL," + - " HL_LAST_HEARTBEAT bigint NOT NULL," + - " HL_ACQUIRED_AT bigint," + - " HL_USER varchar(128) NOT NULL," + - " HL_HOST varchar(128) NOT NULL," + - " HL_HEARTBEAT_COUNT integer," + - " HL_AGENT_INFO varchar(128)," + - " HL_BLOCKEDBY_EXT_ID bigint," + - " HL_BLOCKEDBY_INT_ID bigint," + - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); - stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); - - stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); - - stmt.execute("CREATE TABLE COMPACTION_QUEUE (" + - " CQ_ID bigint PRIMARY KEY," + - " CQ_DATABASE varchar(128) NOT NULL," + - " CQ_TABLE varchar(128) NOT NULL," + - " CQ_PARTITION varchar(767)," + - " CQ_STATE char(1) NOT NULL," + - " CQ_TYPE char(1) NOT NULL," + - " CQ_TBLPROPERTIES varchar(2048)," + - " CQ_WORKER_ID varchar(128)," + - " CQ_START bigint," + - " CQ_RUN_AS varchar(128)," + - " CQ_HIGHEST_TXN_ID bigint," + - " CQ_META_INFO varchar(2048) for bit data," + - " CQ_HADOOP_JOB_ID varchar(32))"); - - stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); - stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); - - stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + - " CC_ID bigint PRIMARY KEY," + - " CC_DATABASE varchar(128) NOT NULL," + - " CC_TABLE varchar(128) NOT NULL," + - " CC_PARTITION varchar(767)," + - " CC_STATE char(1) NOT NULL," + - " CC_TYPE char(1) NOT NULL," + - " CC_TBLPROPERTIES varchar(2048)," + - " CC_WORKER_ID varchar(128)," + - " CC_START bigint," + - " CC_END bigint," + - " CC_RUN_AS varchar(128)," + - " CC_HIGHEST_TXN_ID bigint," + - " CC_META_INFO varchar(2048) for bit data," + - " CC_HADOOP_JOB_ID varchar(32))"); - - stmt.execute("CREATE TABLE AUX_TABLE (" + - " MT_KEY1 varchar(128) NOT NULL," + - " MT_KEY2 bigint NOT NULL," + - " MT_COMMENT varchar(255)," + - " PRIMARY KEY(MT_KEY1, MT_KEY2))"); - - stmt.execute("CREATE TABLE WRITE_SET (" + - " WS_DATABASE varchar(128) NOT NULL," + - " WS_TABLE varchar(128) NOT NULL," + - " WS_PARTITION varchar(767)," + - " WS_TXNID bigint NOT NULL," + - " WS_COMMIT_ID bigint NOT NULL," + - " WS_OPERATION_TYPE char(1) NOT NULL)" - ); - } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException re) { - LOG.error("Error rolling back: " + re.getMessage()); - } - - // This might be a deadlock, if so, let's retry - if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { - LOG.warn("Caught deadlock, retrying db creation"); - prepDb(); - } else { - throw e; - } - } finally { - deadlockCnt = 0; - closeResources(conn, stmt, null); - } - } - - public static void cleanDb() throws Exception { - int retryCount = 0; - while(++retryCount <= 3) { - boolean success = true; - Connection conn = null; - Statement stmt = null; - try { - conn = getConnection(); - stmt = conn.createStatement(); - - // We want to try these, whether they succeed or fail. - try { - stmt.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (SQLException e) { - if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { - //42X65/3000 means index doesn't exist - LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() + - "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); - success = false; - } - } - - success &= dropTable(stmt, "TXN_COMPONENTS", retryCount); - success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); - success &= dropTable(stmt, "TXNS", retryCount); - success &= dropTable(stmt, "NEXT_TXN_ID", retryCount); - success &= dropTable(stmt, "HIVE_LOCKS", retryCount); - success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount); - success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount); - success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount); - success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount); - success &= dropTable(stmt, "AUX_TABLE", retryCount); - success &= dropTable(stmt, "WRITE_SET", retryCount); - } finally { - closeResources(conn, stmt, null); - } - if(success) { - return; - } - } - } - - private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException { - try { - stmt.execute("DROP TABLE " + name); - return true; - } catch (SQLException e) { - if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) { - //failed because object doesn't exist - return true; - } - LOG.error("Unable to drop table " + name + ": " + e.getMessage() + - " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); - } - return false; - } - - /** - * A tool to count the number of partitions, tables, - * and databases locked by a particular lockId. - * - * @param lockId lock id to look for lock components - * - * @return number of components, or 0 if there is no lock - */ - public static int countLockComponents(long lockId) throws Exception { - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - try { - conn = getConnection(); - stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?"); - stmt.setLong(1, lockId); - rs = stmt.executeQuery(); - if (!rs.next()) { - return 0; - } - return rs.getInt(1); - } finally { - closeResources(conn, stmt, rs); - } - } - - /** - * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables - * @param countQuery countQuery text - * @return count countQuery result - * @throws Exception - */ - public static int countQueryAgent(String countQuery) throws Exception { - Connection conn = null; - Statement stmt = null; - ResultSet rs = null; - try { - conn = getConnection(); - stmt = conn.createStatement(); - rs = stmt.executeQuery(countQuery); - if (!rs.next()) { - return 0; - } - return rs.getInt(1); - } finally { - closeResources(conn, stmt, rs); - } - } - @VisibleForTesting - public static String queryToString(String query) throws Exception { - return queryToString(query, true); - } - public static String queryToString(String query, boolean includeHeader) throws Exception { - Connection conn = null; - Statement stmt = null; - ResultSet rs = null; - StringBuilder sb = new StringBuilder(); - try { - conn = getConnection(); - stmt = conn.createStatement(); - rs = stmt.executeQuery(query); - ResultSetMetaData rsmd = rs.getMetaData(); - if(includeHeader) { - for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { - sb.append(rsmd.getColumnName(colPos)).append(" "); - } - sb.append('\n'); - } - while(rs.next()) { - for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { - sb.append(rs.getObject(colPos)).append(" "); - } - sb.append('\n'); - } - } finally { - closeResources(conn, stmt, rs); - } - return sb.toString(); - } - - static Connection getConnection() throws Exception { - HiveConf conf = new HiveConf(); - String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER); - Driver driver = (Driver) Class.forName(jdbcDriver).newInstance(); - Properties prop = new Properties(); - String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); - String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME); - String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); - prop.setProperty("user", user); - prop.setProperty("password", passwd); - Connection conn = driver.connect(driverUrl, prop); - conn.setAutoCommit(true); - return conn; - } - - static void closeResources(Connection conn, Statement stmt, ResultSet rs) { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOG.error("Error closing ResultSet: " + e.getMessage()); - } - } - - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - System.err.println("Error closing Statement: " + e.getMessage()); - } - } - - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException e) { - System.err.println("Error rolling back: " + e.getMessage()); - } - try { - conn.close(); - } catch (SQLException e) { - System.err.println("Error closing Connection: " + e.getMessage()); - } - } - } -}
