http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig new file mode 100644 index 0000000..bc818e0 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig @@ -0,0 +1,3233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Service; +import com.jolbox.bonecp.BoneCPConfig; +import com.jolbox.bonecp.BoneCPDataSource; +import org.apache.commons.dbcp.ConnectionFactory; +import org.apache.commons.dbcp.DriverManagerConnectionFactory; +import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hive.common.ServerUtils; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.dbcp.PoolingDataSource; + +import org.apache.commons.pool.ObjectPool; +import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.util.StringUtils; + +import javax.sql.DataSource; + +import java.io.IOException; +import java.sql.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +/** + * A handler to answer transaction related calls that come into the metastore + * server. + * + * Note on log messages: Please include txnid:X and lockid info using + * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} + * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. + * + * In general there can be multiple metastores where this logic can execute, thus the DB is + * used to ensure proper mutexing of operations. + * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is + * used to properly sequence operations. Most notably: + * 1. various sequence IDs are generated with aid of this mutex + * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state + * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all + * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. + * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. + * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock + * can be granted, no other (strictly speaking "earlier") lock can change state. + * + * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded + * (this is the only supported configuration for Derby) + * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. + * + * {@link #derbyLock} + + * If we ever decide to run remote Derby server, according to + * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be + * seriazlied, so that would also work though has not been tested. + * + * General design note: + * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is + * still valid and active. In the code this is usually achieved at the same time the txn record + * is locked for some operation. + */ [email protected] [email protected] +abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { + + static final protected char INITIATED_STATE = 'i'; + static final protected char WORKING_STATE = 'w'; + static final protected char READY_FOR_CLEANING = 'r'; + static final char FAILED_STATE = 'f'; + static final char SUCCEEDED_STATE = 's'; + static final char ATTEMPTED_STATE = 'a'; + + // Compactor types + static final protected char MAJOR_TYPE = 'a'; + static final protected char MINOR_TYPE = 'i'; + + // Transaction states + static final protected char TXN_ABORTED = 'a'; + static final protected char TXN_OPEN = 'o'; + + // Lock states + static final protected char LOCK_ACQUIRED = 'a'; + static final protected char LOCK_WAITING = 'w'; + + // Lock types + static final protected char LOCK_EXCLUSIVE = 'e'; + static final protected char LOCK_SHARED = 'r'; + static final protected char LOCK_SEMI_SHARED = 'w'; + + static final private int ALLOWED_REPEATED_DEADLOCKS = 10; + static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); + + static private DataSource connPool; + static private boolean doRetryOnConnPool = false; + + private enum OpertaionType { + INSERT('i'), UPDATE('u'), DELETE('d'); + private final char sqlConst; + OpertaionType(char sqlConst) { + this.sqlConst = sqlConst; + } + public String toString() { + return Character.toString(sqlConst); + } + public static OpertaionType fromString(char sqlConst) { + switch (sqlConst) { + case 'i': + return INSERT; + case 'u': + return UPDATE; + case 'd': + return DELETE; + default: + throw new IllegalArgumentException(quoteChar(sqlConst)); + } + } + //we should instead just pass in OpertaionType from client (HIVE-13622) + @Deprecated + public static OpertaionType fromLockType(LockType lockType) { + switch (lockType) { + case SHARED_READ: + return INSERT; + case SHARED_WRITE: + return UPDATE; + default: + throw new IllegalArgumentException("Unexpected lock type: " + lockType); + } + } + } + + /** + * Number of consecutive deadlocks we have seen + */ + private int deadlockCnt; + private long deadlockRetryInterval; + protected HiveConf conf; + protected DatabaseProduct dbProduct; + + // (End user) Transaction timeout, in milliseconds. + private long timeout; + + private String identifierQuoteString; // quotes to use for quoting tables, where necessary + private long retryInterval; + private int retryLimit; + private int retryNum; + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); + /** + * must be static since even in UT there may be > 1 instance of TxnHandler + * (e.g. via Compactor services) + */ + private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>(); + private static final String hostname = ServerUtils.hostname(); + + // Private methods should never catch SQLException and then throw MetaException. The public + // methods depend on SQLException coming back so they can detect and handle deadlocks. Private + // methods should only throw MetaException when they explicitly know there's a logic error and + // they want to throw past the public methods. + // + // All public methods that write to the database have to check for deadlocks when a SQLException + // comes back and handle it if they see one. This has to be done with the connection pooling + // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, + // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. + + public TxnHandler() { + } + + /** + * This is logically part of c'tor and must be called prior to any other method. + * Not physically part of c'tor due to use of relfection + */ + public void setConf(HiveConf conf) { + this.conf = conf; + + checkQFileTestHack(); + + Connection dbConn = null; + // Set up the JDBC connection pool + try { + setupJdbcConnectionPool(conf); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); + } catch (SQLException e) { + String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); + LOG.error(msg); + throw new RuntimeException(e); + } + finally { + closeDbConn(dbConn); + } + + timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); + buildJumpTable(); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, + TimeUnit.MILLISECONDS); + retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); + deadlockRetryInterval = retryInterval / 10; + } + + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This method can run at READ_COMMITTED as long as long as + * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. + * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with + * adding corresponding entries into TXNS. The reason is that any txnid below HWM + * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + List<TxnInfo> txnInfo = new ArrayList<TxnInfo>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + char c = rs.getString(2).charAt(0); + TxnState state; + switch (c) { + case TXN_ABORTED: + state = TxnState.ABORTED; + break; + + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + + " found in txns table"); + } + txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + return new GetOpenTxnsInfoResponse(hwm, txnInfo); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxnsInfo"); + throw new MetaException("Unable to select from transaction database: " + getMessage(e) + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxnsInfo(); + } + } + + public GetOpenTxnsResponse getOpenTxns() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} +\ */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + Set<Long> openList = new HashSet<Long>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id from TXNS where txn_id <= " + hwm; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + openList.add(rs.getLong(1)); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + return new GetOpenTxnsResponse(hwm, openList); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxns"); + throw new MetaException("Unable to select from transaction database, " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxns(); + } + } + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + int numTxns = rqst.getNum_txns(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + lockInternal(); + /** + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. + * Also, advancing the counter must work when multiple metastores are running. + * SELECT ... FOR UPDATE is used to prevent + * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + // Make sure the user has not requested an insane amount of txns. + int maxTxns = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) numTxns = maxTxns; + + stmt = dbConn.createStatement(); + String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + long first = rs.getLong(1); + s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + long now = getDbTime(dbConn); + List<Long> txnIds = new ArrayList<Long>(numTxns); + ArrayList<String> queries = new ArrayList<String>(); + String query; + String insertClause = "insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host) values "; + StringBuilder valuesClause = new StringBuilder(); + + for (long i = first; i < first + numTxns; i++) { + txnIds.add(i); + + if (i > first && + (i - first) % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + // wrap up the current query, and start a new one + query = insertClause + valuesClause.toString(); + queries.add(query); + + valuesClause.setLength(0); + valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) + .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) + .append("')"); + + continue; + } + + if (i > first) { + valuesClause.append(", "); + } + + valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) + .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) + .append("')"); + } + + query = insertClause + valuesClause.toString(); + queries.add(query); + + for (String q : queries) { + LOG.debug("Going to execute update <" + q + ">"); + stmt.execute(q); + } + LOG.debug("Going to commit"); + dbConn.commit(); + return new OpenTxnsResponse(txnIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return openTxns(rqst); + } + } + + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException { + long txnid = rqst.getTxnid(); + try { + Connection dbConn = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + abortTxn(rqst); + } + } + + public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { + List<Long> txnids = rqst.getTxn_ids(); + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + int numAborted = abortTxns(dbConn, txnids, false); + if (numAborted != txnids.size()) { + LOG.warn("Abort Transactions command only abort " + numAborted + " out of " + + txnids.size() + " transactions. It's possible that the other " + + (txnids.size() - numAborted) + + " transactions have been aborted or committed, or the transaction ids are invalid."); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxns(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + abortTxns(rqst); + } + } + + /** + * Concurrency/isolation notes: + * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} + * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X + * see more notes below. + * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn + * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence + * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of + * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. + * + * Motivating example: + * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 + * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot + * that they read appropriately. In particular, if txns do not overlap, then one follows the other + * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure + * this by locking in snapshot after + * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()}) + * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure + * that txn T which will be considered a later txn, locks in a snapshot that includes the result + * of S's commit (assuming no other txns). + * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions + * were running in parallel). If T and S both locked in the same snapshot (for example commit of + * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) + * 'x' would be updated to the same value by both, i.e. lost update. + */ + public void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + long txnid = rqst.getTxnid(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet lockHandle = null; + ResultSet commitIdRs = null, rs; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + /** + * This S4U will mutex with other commitTxn() and openTxns(). + * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial + * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start + * at the same time and no new txns start until all 3 commit. + * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + */ + commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); + if(!commitIdRs.next()) { + throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + } + long commitId = commitIdRs.getLong(1); + /** + * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other + * operation can change this txn (such acquiring locks). While lock() and commitTxn() + * should not normally run concurrently (for same txn) but could due to bugs in the client + * which could then corrupt internal transaction manager state. Also competes with abortTxn(). + */ + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + //this also ensures that txn is still there and in expected state (hasn't been timed out) + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + + " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + + "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"); + if(numCompsWritten == 0) { + /** + * current txn didn't update/delete anything (may have inserted), so just proceed with commit + * + * We only care about commit id for write txns, so for RO (when supported) txns we don't + * have to mutex on NEXT_TXN_ID. + * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's + * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. + * If RO < W, then there is no reads-from relationship. + */ + } + else { + /** + * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + rs = stmt.executeQuery + (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + + "committed.ws_table, committed.ws_partition, cur.ws_commit_id " + + "from WRITE_SET committed INNER JOIN WRITE_SET cur " + + "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + + //For partitioned table we always track writes at partition level (never at table) + //and for non partitioned - always at table level, thus the same table should never + //have entries with partition key and w/o + "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + + "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commitTxn() op + " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns + //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all + " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + + " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); + if(rs.next()) { + //found a conflict + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if(partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn; + close(rs); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if(abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + close(null, stmt, dbConn); + throw new TxnAbortedException(msg); + } + else { + //no conflicting operations, proceed with the rest of commit sequence + } + } + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute insert <" + s + ">"); + if (stmt.executeUpdate(s) < 1) { + //this can be reasonable for an empty txn START/COMMIT or read-only txn + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + } + s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + s = "delete from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(commitIdRs); + close(lockHandle, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + commitTxn(rqst); + } + } + @Override + public void performWriteSetGC() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + if(!rs.next()) { + throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); + } + long highestAllocatedTxnId = rs.getLong(1); + close(rs); + rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); + if(!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark + long lowestOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + //if here then there are no Open txns and highestAllocatedTxnId must be + //resolved (i.e. committed or aborted), either way + //there are no open txns with id <= highestAllocatedTxnId + //the +1 is there because "delete ..." below has < (which is correct for the case when + //there is an open txn + //Concurrency: even if new txn starts (or starts + commits) it is still true that + //there are no currently open txns that overlap with any committed txn with + //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. + commitHighWaterMark = highestAllocatedTxnId + 1; + } + else { + commitHighWaterMark = lowestOpenTxnId; + } + int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); + LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET"); + dbConn.commit(); + } catch (SQLException ex) { + LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); + } + finally { + close(rs, stmt, dbConn); + } + } + /** + * As much as possible (i.e. in absence of retries) we want both operations to be done on the same + * connection (but separate transactions). This avoid some flakiness in BONECP where if you + * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one + * doesn't see results of the first. + */ + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); + try { + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + } + catch(NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); + } + } + private static final class ConnectionLockIdPair { + private final Connection dbConn; + private final long extLockId; + private ConnectionLockIdPair(Connection dbConn, long extLockId) { + this.dbConn = dbConn; + this.extLockId = extLockId; + } + } + + /** + * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read + * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but + * to the same value as before thus forcing db to acquire write lock for duration of the transaction. + * + * There is no real reason to return the ResultSet here other than to make sure the reference to it + * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock + * to be released. + * @param txnState the state this txn is expected to be in. may be null + * @return null if no row was found + * @throws SQLException + * @throws MetaException + */ + private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); + ResultSet rs = stmt.executeQuery(addForUpdateClause(query)); + if(rs.next()) { + return rs; + } + close(rs); + return null; + } + + /** + * This enters locks into the queue in {@link #LOCK_WAITING} mode. + * + * Isolation Level Notes: + * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes + * any 2 {@code enqueueLockWithRetry()} calls. + * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations + * @see #checkLockWithRetry(Connection, long, long) + */ + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + boolean success = false; + Connection dbConn = null; + try { + Statement stmt = null; + ResultSet rs = null; + ResultSet lockHandle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long txnid = rqst.getTxnid(); + stmt = dbConn.createStatement(); + if (isValidTxn(txnid)) { + //this also ensures that txn is still there in expected state + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + } + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + if (txnid > 0) { + /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get) + * So if we add that to LockRequest we'll know that here + * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest + * to contain LockComponent for multiple operations. + * Deriving it from lock info doesn't distinguish between Update and Delete + * + * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc + * FileSinkDesc.table is ql.metadata.Table + * Table.tableSpec which is TableSpec, which has specType which is SpecType + * So maybe this can work to know that this is part of dynamic partition insert in which case + * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here. + * In any case, that's an optimization for now; will be required when adding multi-stmt txns + */ + // For each component in this lock request, + // add an entry to the txn_components table + // This must be done before HIVE_LOCKS is accessed + for (LockComponent lc : rqst.getComponent()) { + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + s = "insert into TXN_COMPONENTS " + + "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " + + "values (" + txnid + ", '" + dbName + "', " + + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'")+ "," + + quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } + + long intLockId = 0; + for (LockComponent lc : rqst.getComponent()) { + intLockId++; + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + LockType lockType = lc.getType(); + char lockChar = 'z'; + switch (lockType) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + long now = getDbTime(dbConn); + s = "insert into HIVE_LOCKS " + + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + + " values (" + extLockId + ", " + + +intLockId + "," + txnid + ", '" + + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + + ", " + (partName == null ? "null" : "'" + partName + "'") + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + (isValidTxn(txnid) ? 0 : now) + ", '" + + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + dbConn.commit(); + success = true; + return new ConnectionLockIdPair(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle); + close(rs, stmt, null); + if (!success) { + /* This needs to return a "live" connection to be used by operation that follows it. + Thus it only closes Connection on failure/retry. */ + closeDbConn(dbConn); + } + unlockInternal(); + } + } + catch(RetryException e) { + return enqueueLockWithRetry(rqst); + } + } + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { + try { + try { + lockInternal(); + if(dbConn.isClosed()) { + //should only get here if retrying this op + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + } + return checkLock(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + unlockInternal(); + closeDbConn(dbConn); + } + } + catch(RetryException e) { + return checkLockWithRetry(dbConn, extLockId, txnId); + } + } + /** + * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. + * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. + * + * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(), + * in practice more often) + * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB. + * + * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. + * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. + * + * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking + * against doesn't move from W to A in another txn) but this method can heartbeat in + * separate txn at READ_COMMITTED. + */ + public LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + try { + Connection dbConn = null; + long extLockId = rqst.getLockid(); + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + // Heartbeat on the lockid first, to assure that our lock is still valid. + // Then look up the lock info (hopefully in the cache). If these locks + // are associated with a transaction then heartbeat on that as well. + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if (info.txnId > 0) { + heartbeatTxn(dbConn, info.txnId); + } + else { + heartbeatLock(dbConn, extLockId); + } + //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and + //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired + //extra heartbeat is logically harmless, but ... + return checkLock(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); + throw new MetaException("Unable to update transaction database " + + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return checkLock(rqst); + } + + } + + /** + * This would have been made simpler if all locks were associated with a txn. Then only txn needs to + * be heartbeated, committed, etc. no need for client to track individual locks. + * When removing locks not associated with txn this potentially conflicts with + * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. + * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed + */ + public void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + long extLockId = rqst.getLockid(); + try { + /** + * This method is logically like commit for read-only auto commit queries. + * READ_COMMITTED since this only has 1 delete statement and no new entries with the + * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are + * created in a single atomic operation. + * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} + * but hl_lock_ext_id is not known until that method returns. + * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)} + * but using SERIALIZABLE doesn't materially change the interaction. + * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + //hl_txnid <> 0 means it's associated with a transaction + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; + //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where + //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + //didn't find any lock with extLockId but at ReadCommitted there is a possibility that + //it existed when above delete ran but it didn't have the expected state. + LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if(info.txnId != 0) { + String msg = "Unlocking locks associated with transaction not permitted. " + info; + LOG.error(msg); + throw new TxnOpenException(msg); + } + if(info.txnId == 0) { + //we didn't see this lock when running DELETE stmt above but now it showed up + //so should "should never happen" happened... + String msg = "Found lock in unexpected state " + info; + LOG.error(msg); + throw new MetaException(msg); + } + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "unlock(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + unlock(rqst); + } + } + + /** + * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} + */ + private static class LockInfoExt extends LockInfo { + private final ShowLocksResponseElement e; + LockInfoExt(ShowLocksResponseElement e) { + super(e); + this.e = e; + } + } + public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { + try { + Connection dbConn = null; + ShowLocksResponse rsp = new ShowLocksResponse(); + List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>(); + List<LockInfoExt> sortedList = new ArrayList<LockInfoExt>(); + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + + "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS"; + + // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. + String dbName = rqst.getDbname(); + String tableName = rqst.getTablename(); + String partName = rqst.getPartname(); + + StringBuilder filter = new StringBuilder(); + if (dbName != null && !dbName.isEmpty()) { + filter.append("hl_db=").append(quoteString(dbName)); + } + if (tableName != null && !tableName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_table=").append(quoteString(tableName)); + } + if (partName != null && !partName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_partition=").append(quoteString(partName)); + } + String whereClause = filter.toString(); + + if (!whereClause.isEmpty()) { + s = s + " where " + whereClause; + } + + LOG.debug("Doing to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowLocksResponseElement e = new ShowLocksResponseElement(); + e.setLockid(rs.getLong(1)); + long txnid = rs.getLong(2); + if (!rs.wasNull()) e.setTxnid(txnid); + e.setDbname(rs.getString(3)); + e.setTablename(rs.getString(4)); + String partition = rs.getString(5); + if (partition != null) e.setPartname(partition); + switch (rs.getString(6).charAt(0)) { + case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; + case LOCK_WAITING: e.setState(LockState.WAITING); break; + default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); + } + switch (rs.getString(7).charAt(0)) { + case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; + case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; + case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; + default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); + } + e.setLastheartbeat(rs.getLong(8)); + long acquiredAt = rs.getLong(9); + if (!rs.wasNull()) e.setAcquiredat(acquiredAt); + e.setUser(rs.getString(10)); + e.setHostname(rs.getString(11)); + e.setLockIdInternal(rs.getLong(12)); + long id = rs.getLong(13); + if(!rs.wasNull()) { + e.setBlockedByExtId(id); + } + id = rs.getLong(14); + if(!rs.wasNull()) { + e.setBlockedByIntId(id); + } + sortedList.add(new LockInfoExt(e)); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + checkRetryable(dbConn, e, "showLocks(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined + //by checkLock() - makes diagnostics easier. + Collections.sort(sortedList, new LockInfoComparator()); + for(LockInfoExt lockInfoExt : sortedList) { + elems.add(lockInfoExt.e); + } + rsp.setLocks(elems); + return rsp; + } catch (RetryException e) { + return showLocks(rqst); + } + } + + /** + * {@code ids} should only have txnid or lockid but not both, ideally. + * Currently DBTxnManager.heartbeat() enforces this. + */ + public void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + heartbeatLock(dbConn, ids.getLockid()); + heartbeatTxn(dbConn, ids.getTxnid()); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeat(" + ids + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + heartbeat(ids); + } + } + + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException { + try { + Connection dbConn = null; + HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); + Set<Long> nosuch = new HashSet<Long>(); + Set<Long> aborted = new HashSet<Long>(); + rsp.setNosuch(nosuch); + rsp.setAborted(aborted); + try { + /** + * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)} + * only has 1 update statement in it and + * we only update existing txns, i.e. nothing can add additional txns that this operation + * would care about (which would have required SERIALIZABLE) + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { + try { + //todo: do all updates in 1 SQL statement and check update count + //if update count is less than was requested, go into more expensive checks + //for each txn + heartbeatTxn(dbConn, txn); + } catch (NoSuchTxnException e) { + nosuch.add(txn); + } catch (TxnAbortedException e) { + aborted.add(txn); + } + } + return rsp; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + return heartbeatTxnRange(rqst); + } + } + + long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { + // Get the id for the next entry in the queue + String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + LOG.debug("going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Transaction tables not properly initiated, " + + "no record found in next_compaction_queue_id"); + } + long id = rs.getLong(1); + s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + return id; + } + public long compact(CompactionRequest rqst) throws MetaException { + // Put a compaction request in the queue. + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + long id = generateCompactionQueueId(stmt); + + StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + + "cq_table, "); + String partName = rqst.getPartitionname(); + if (partName != null) buf.append("cq_partition, "); + buf.append("cq_state, cq_type"); + if (rqst.getRunas() != null) buf.append(", cq_run_as"); + buf.append(") values ("); + buf.append(id); + buf.append(", '"); + buf.append(rqst.getDbname()); + buf.append("', '"); + buf.append(rqst.getTablename()); + buf.append("', '"); + if (partName != null) { + buf.append(partName); + buf.append("', '"); + } + buf.append(INITIATED_STATE); + buf.append("', '"); + switch (rqst.getType()) { + case MAJOR: + buf.append(MAJOR_TYPE); + break; + + case MINOR: + buf.append(MINOR_TYPE); + break; + + default: + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); + } + if (rqst.getRunas() != null) { + buf.append("', '"); + buf.append(rqst.getRunas()); + } + buf.append("')"); + String s = buf.toString(); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + return id; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "compact(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return compact(rqst); + } + } + + public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { + ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>()); + Connection dbConn = null; + Statement stmt = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; + //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) + //to sort so that currently running jobs are at the end of the list (bottom of screen) + //and currently running ones are in sorted by start time + //w/o order by likely currently running compactions will be first (LHS of Union) + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowCompactResponseElement e = new ShowCompactResponseElement(); + e.setDbname(rs.getString(1)); + e.setTablename(rs.getString(2)); + e.setPartitionname(rs.getString(3)); + switch (rs.getString(4).charAt(0)) { + case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; + case WORKING_STATE: e.setState(WORKING_RESPONSE); break; + case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; + case FAILED_STATE: e.setState(FAILED_RESPONSE); break; + case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break; + case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break; + default: + //do nothing to handle RU/D if we add another status + } + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; + case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + default: + //do nothing to handle RU/D if we add another status + } + e.setWorkerid(rs.getString(6)); + e.setStart(rs.getLong(7)); + long endTime = rs.getLong(8); + if(endTime != -1) { + e.setEndTime(endTime); + } + e.setRunAs(rs.getString(9)); + e.setHadoopJobId(rs.getString(10)); + long id = rs.getLong(11);//for debugging + response.addToCompacts(e); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "showCompact(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + return response; + } catch (RetryException e) { + return showCompact(rqst); + } + } + + private static void shouldNeverHappen(long txnid) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); + } + private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " " + + JavaUtils.lockIdToString(extLockId) + " " + intLockId); + } + + public void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet lockHandle = null; + ResultSet rs = null; + try { + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if(lockHandle == null) { + //ensures txn is still there and in expected state + ensureValidTxn(dbConn, rqst.getTxnid(), stmt); + shouldNeverHappen(rqst.getTxnid()); + } + //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to + //for multi stmt txns if same table is written more than once per tx + // MoveTask knows if it's I/U/D + // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions() + // which ends up here so we'd need to add a field to AddDynamicPartitions. + String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); + //do limit 1 on this; currently they will all have the same operations + rs = stmt.executeQuery(addLimitClause(1, findOperationType)); + if(!rs.next()) { + throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid())); + } + OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0)); + + //what if a txn writes the same table > 1 time... let's go with this for now, but really + //need to not write this in the first place, i.e. make this delete not needed + //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS + String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" + + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); + //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is + //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which + //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually + //written to + stmt.executeUpdate(deleteSql); + for (String partName : rqst.getPartitionnames()) { + String s = + "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" + + rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")"); + throw new MetaException("Unable to insert into from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + addDynamicPartitions(rqst); + } + } + + /** + * Clean up corresponding records in metastore tables, specifically: + * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + */ + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + + try { + String dbName; + String tblName; + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<String> queries = new ArrayList<String>(); + StringBuilder buff = new StringBuilder(); + + switch (type) { + case DATABASE: + dbName = db.getName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case TABLE: + dbName = table.getDbName(); + tblName = table.getTableName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case PARTITION: + dbName = table.getDbName(); + tblName = table.getTableName(); + List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns + List<String> partVals; // partition values + String partName; + + while (partitionIterator.hasNext()) { + Partition p = partitionIterator.next(); + partVals = p.getValues(); + partName = Warehouse.makePartName(partCols, partVals); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("' and tc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("' and ctc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("' and cq_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("' and cc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + } + + break; + default: + throw new MetaException("Invalid object type for cleanup: " + type); + } + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + cleanupRecords(type, db, table, partitionIterator); + } + } + + /** + * For testing only, do not use. + */ + @VisibleForTesting + public int numLocksInLockTable() throws SQLException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from HIVE_LOCKS"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + rs.next(); + int rc = rs.getInt(1); + // Necessary to clean up the transaction in the db. + dbConn.rollback(); + return rc; + } finally { + close(rs, stmt, dbConn); + } + } + + /** + * For testing only, do not use. + */ + public long setTimeout(long milliseconds) { + long previous_timeout = timeout; + timeout = milliseconds; + return previous_timeout; + } + + protected class RetryException extends Exception { + + } + + protected Connection getDbConn(int isolationLevel) throws SQLException { + int rc = doRetryOnConnPool ? 10 : 1; + Connection dbConn = null; + while (true) { + try { + dbConn = connPool.getConnection(); + dbConn.setAutoCommit(false); + dbConn.setTransactionIsolation(isolationLevel); + return dbConn; + } catch (SQLException e){ + closeDbConn(dbConn); + if ((--rc) <= 0) throw e; + LOG.error("There is a problem with a connection from the pool, retrying(rc=" + rc + "): " + + getMessage(e), e); + } + } + } + + static void rollbackDBConn(Connection dbConn) { + try { + if (dbConn != null && !dbConn.isClosed()) dbConn.rollback(); + } catch (SQLException e) { + LOG.warn("Failed to rollback db connection " + getMessage(e)); + } + } + protected static void closeDbConn(Connection dbConn) { + try { + if (dbConn != null && !dbConn.isClosed()) { + dbConn.close(); + } + } catch (SQLException e) { + LOG.warn("Failed to close db connection " + getMessage(e)); + } + } + + /** + * Close statement instance. + * @param stmt statement instance. + */ + protected static void closeStmt(Statement stmt) { + try { + if (stmt != null && !stmt.isClosed()) stmt.close(); + } catch (SQLException e) { + LOG.warn("Failed to close statement " + getMessage(e)); + } + } + + /** + * Close the ResultSet. + * @param rs may be {@code null} + */ + static void close(ResultSet rs) { + try { + if (rs != null && !rs.isClosed()) { + rs.close(); + } + } + catch(SQLException ex) { + LOG.warn("Failed to close statement " + getMessage(ex)); + } + } + + /** + * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn} + */ + static void close(ResultSet rs, Statement stmt, Connection dbConn) { + close(rs); + closeStmt(stmt); + closeDbConn(dbConn); + } + /** + * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do + * this, so we have to inspect the error messages and catch the telltale signs for each + * different database. This method will throw {@code RetryException} + * if the error is retry-able. + * @param conn database connection + * @param e exception that was thrown. + * @param caller name of the method calling this (and other info useful to log) + * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried + */ + protected void checkRetryable(Connection conn, + SQLException e, + String caller) throws RetryException, MetaException { + + // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() + // to test these changes. + // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01. + // Oracle seems to return different SQLStates and messages each time, + // so I've tried to capture the different error messages (there appear to be fewer different + // error messages than SQL states). + // Derby and newer MySQL driver use the new SQLTransactionRollbackException + boolean sendRetrySignal = false; + try { + if(dbProduct == null) { + throw new IllegalStateException("DB Type not determined yet."); + } + if (e instanceof SQLTransactionRollbackException || + ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || + dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || + (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || + (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") + || e.getMessage().contains("can't serialize access for this transaction")))) { + if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { + long waitInterval = deadlockRetryInterval * deadlockCnt; + LOG.warn("Deadlock detected in " + caller + ". Will wait " + waitInterval + + "ms try again up to " + (ALLOWED_REPEATED_DEADLOCKS - deadlockCnt + 1) + " times."); + // Pause for a just a bit for retrying to avoid immediately jumping back into the deadlock. + try { + Thread.sleep(waitInterval); + } catch (InterruptedException ie) { + // NOP + } + sendRetrySignal = true; + } else { + LOG.error("Too many repeated deadlocks in " + caller + ", giving up."); + } + } else if (isRetryable(conf, e)) { + //in MSSQL this means Communication Link Failure + if (retryNum++ < retryLimit) { + LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval + + "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e)); + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ex) { + // + } + sendRetrySignal = true; + } else { + LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); + } + } + else { + //make sure we know we saw an error that we don't recognize + LOG.info("Non-retryable error: " + getMessage(e)); + } + } + finally { + /*if this method ends with anything except a retry signal, the caller should fail the operation + and propagate the error up to the its caller (Metastore client); thus must reset retry counters*/ + if(!sendRetrySignal) { + deadlockCnt = 0; + retryNum = 0; + } + } + if(sendRetrySignal) { + throw new RetryException(); + } + } + + /** + * Determine the current time, using the RDBMS as a source of truth + * @param conn database connection + * @return current time in milliseconds + * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined + */ + protected long getDbTime(Connection conn) throws MetaException { + Statement stmt = null; + try { + stmt = conn.createStatement(); + String s; + switch (dbProduct) { + case DERBY: + s = "values current_timestamp"; + break; + + case MYSQL: + case POSTGRES: + case SQLSERVER: + s = "select current_timestamp"; + break; + + case ORACLE: + s = "select current_timestamp from dual"; + break; + + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) throw new MetaException("No results from date query"); + return rs.getTimestamp(1).getTime(); + } catch (SQLException e) { + String msg = "Unable to determine current time: " + e.getMessage(); + LOG.error(msg); + throw new MetaException(msg); + } finally { + closeStmt(stmt); + } + } + + /** + * Determine the String that should be used to quote identifiers. + * @param conn Active connection + * @return quotes + * @throws SQLException + */ + protected String getIdentifierQuoteString(Connection conn) throws SQLException { + if (identifierQuoteString == null) { + identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); + } + return identifierQuoteString; + } + + protected enum DatabaseProduct { DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER} + + /** + * Determine the database product type + * @param conn database connection + * @return database product type + */ + private DatabaseProduct determineDatabaseProduct(Connection conn) { + if (dbProduct == null) { + try { + String s = conn.getMetaData().getDatabaseProductName(); + if (s == null) { + String msg = "getDatabaseProductName returns null, can't determine database product"; + LOG.error(msg); + throw new IllegalStateException(msg); + } else if (s.equals("Apache Derby")) { + dbProduct = DatabaseProduct.DERBY; + } else if (s.equals("Microsoft SQL Server")) { + dbProduct = DatabaseProduct.SQLSERVER; + } else if (s.equals("MySQL")) { + dbProduct = DatabaseProduct.MYSQL; + } else if (s.equals("Oracle")) { + dbProduct = DatabaseProduct.ORACLE; + } else if (s.equals("PostgreSQL")) { + dbProduct = DatabaseProduct.POSTGRES; + } else { + String msg = "Unrecognized database product name <" + s + ">"; + LOG.error(msg); + throw new IllegalStateException(msg); + } + + } catch (SQLException e) { + String msg = "Unable to get database product name: " + e.getMessage(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + return dbProduct; + } + + private static class LockInfo { + private final long extLockId; + private final long intLockId; + //0 means there is no transaction, i.e. it a select statement which is not part of + //explicit transaction or a IUD statement that is not writing to ACID table + private final long txnId; + private final String db; + private final String table; + private final String partition; + private final LockState state; + private final LockType type; + + // Assumes the result set is set to a valid row + LockInfo(ResultSet rs) throws SQLException, MetaException { + extLockId = rs.getLong("hl_lock_ext_id"); // can't be null + intLockId = rs.getLong("hl_lock_int_id"); // can't be null + db = rs.getString("hl_db"); // can't be null + String t = rs.getString("hl_table"); + table = (rs.wasNull() ? null : t); + String p = rs.getString("hl_partition"); + partition = (rs.wasNull() ? null : p); + switch (rs.getString("hl_lock_state").charAt(0)) { + case LOCK_WAITING: state = LockState.WAITING; break; + case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; + default: + throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0)); + } + switch (rs.getString("hl_lock_type").charAt(0)) { + case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; + case LOCK_SHARED: type = LockType.SHARED_READ; break; + case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; + default: + throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); + } + txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL + } + LockInfo(ShowLocksResponseElement e) { + extLockId = e.getLockid(); + intLockId = e.getLockIdInternal(); + txnId = e.getTxnid(); + db = e.getDbname(); + table = e.getTablename(); + partition = e.getPartname(); + state = e.getState(); + type = e.getType(); + } + + public boolean equals(Object other) { + if (!(other instanceof LockInfo)) return false; + LockInfo o = (LockInfo)other; + // Lock ids are unique across the system. + return extLockId == o.extLockId && intLockId == o.intLockId; + } + + @Override + public String toString() { + return JavaUtils.lockIdToString(extLockId) + " intLockId:" + + intLockId + " " + JavaUtils.txnIdToString(txnId) + + " db:" + db + " table:" + table + " partition:" + + partition + " state:" + (state == null ? "null" : state.toString()) + + " type:" + (type == null ? "null" : type.toString()); + } + private boolean isDbLock() { + return db != null && table == null && partition == null; + } + private boolean isTableLock() { + return db != null && table != null && partition == null; + } + } + + private static class LockInfoComparator implements Comparator<LockInfo> { + private static final LockTypeComparator lockTypeComparator = new LockTypeComparator(); + public boolean equals(Object other) { + return this == other; + } + + public int compare(LockInfo info1, LockInfo info2) { + // We sort by state (acquired vs waiting) and then by LockType, they by id + if (info1.state == LockState.ACQUIRED && + info2.state != LockState .ACQUIRED) { + return -1; + } + if (info1.state != LockState.ACQUIRED && + info2.state == LockState .ACQUIRED) { + return 1; + } + + int sortByType = lockTypeComparator.compare(info1.type, info2.type); + if(sortByType != 0) { + return sortByType; + } + if (info1.extLockId < info2.extLockId) { + return -1; + } else if (info1.extLockId > info2.extLockId) { + return 1; + } else { + if (info1.intLockId < info2.intLockId) { + return -1; + } else i
<TRUNCATED>
