http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java new file mode 100644 index 0000000..7087b03 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -0,0 +1,3643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.dbcp.ConnectionFactory; +import org.apache.commons.dbcp.DriverManagerConnectionFactory; +import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.RetrySemantics; +import org.apache.hadoop.hive.metastore.DatabaseProduct; +import org.apache.hadoop.hive.metastore.RunnableConfigurable; +import org.apache.hadoop.hive.metastore.ThreadPool; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.datasource.BoneCPDataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.HikariCPDataSourceProvider; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.StringableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.dbcp.PoolingDataSource; + +import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.util.StringUtils; + +import javax.sql.DataSource; + +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.sql.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +/** + * A handler to answer transaction related calls that come into the metastore + * server. + * + * Note on log messages: Please include txnid:X and lockid info using + * {@link JavaUtils#txnIdToString(long)} + * and {@link JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. + * + * In general there can be multiple metastores where this logic can execute, thus the DB is + * used to ensure proper mutexing of operations. + * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is + * used to properly sequence operations. Most notably: + * 1. various sequence IDs are generated with aid of this mutex + * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state + * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all + * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. + * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. + * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock + * can be granted, no other (strictly speaking "earlier") lock can change state. + * + * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded + * (this is the only supported configuration for Derby) + * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. + * + * {@link #derbyLock} + + * If we ever decide to run remote Derby server, according to + * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be + * seriazlied, so that would also work though has not been tested. + * + * General design note: + * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is + * still valid and active. In the code this is usually achieved at the same time the txn record + * is locked for some operation. + * + * Note on retry logic: + * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}. The retry logic there is very + * generic and is not aware whether the operations are idempotent or not. (This is separate from + * retry logic here in TxnHander which can/does retry DB errors intelligently). The worst case is + * when an op here issues a successful commit against the RDBMS but the calling stack doesn't + * receive the ack and retries. (If an op fails before commit, it's trivially idempotent) + * Thus the ops here need to be made idempotent as much as possible or + * the metstore call stack should have logic not to retry. There are {@link RetrySemantics} + * annotations to document the behavior. + */ [email protected] [email protected] +abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { + + static final protected char INITIATED_STATE = 'i'; + static final protected char WORKING_STATE = 'w'; + static final protected char READY_FOR_CLEANING = 'r'; + static final char FAILED_STATE = 'f'; + static final char SUCCEEDED_STATE = 's'; + static final char ATTEMPTED_STATE = 'a'; + + // Compactor types + static final protected char MAJOR_TYPE = 'a'; + static final protected char MINOR_TYPE = 'i'; + + // Transaction states + static final protected char TXN_ABORTED = 'a'; + static final protected char TXN_OPEN = 'o'; + //todo: make these like OperationType and remove above char constatns + enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} + + // Lock states + static final protected char LOCK_ACQUIRED = 'a'; + static final protected char LOCK_WAITING = 'w'; + + // Lock types + static final protected char LOCK_EXCLUSIVE = 'e'; + static final protected char LOCK_SHARED = 'r'; + static final protected char LOCK_SEMI_SHARED = 'w'; + + static final private int ALLOWED_REPEATED_DEADLOCKS = 10; + static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); + + static private DataSource connPool; + private static DataSource connPoolMutex; + static private boolean doRetryOnConnPool = false; + + private enum OpertaionType { + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'); + private final char sqlConst; + OpertaionType(char sqlConst) { + this.sqlConst = sqlConst; + } + public String toString() { + return Character.toString(sqlConst); + } + public static OpertaionType fromString(char sqlConst) { + switch (sqlConst) { + case 's': + return SELECT; + case 'i': + return INSERT; + case 'u': + return UPDATE; + case 'd': + return DELETE; + default: + throw new IllegalArgumentException(quoteChar(sqlConst)); + } + } + public static OpertaionType fromDataOperationType(DataOperationType dop) { + switch (dop) { + case SELECT: + return OpertaionType.SELECT; + case INSERT: + return OpertaionType.INSERT; + case UPDATE: + return OpertaionType.UPDATE; + case DELETE: + return OpertaionType.DELETE; + default: + throw new IllegalArgumentException("Unexpected value: " + dop); + } + } + } + + // Maximum number of open transactions that's allowed + private static volatile int maxOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static volatile boolean tooManyOpenTxns = false; + + /** + * Number of consecutive deadlocks we have seen + */ + private int deadlockCnt; + private long deadlockRetryInterval; + protected Configuration conf; + private static DatabaseProduct dbProduct; + private static SQLGenerator sqlGenerator; + + // (End user) Transaction timeout, in milliseconds. + private long timeout; + + private String identifierQuoteString; // quotes to use for quoting tables, where necessary + private long retryInterval; + private int retryLimit; + private int retryNum; + // Current number of open txns + private AtomicInteger numOpenTxns; + + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); + /** + * must be static since even in UT there may be > 1 instance of TxnHandler + * (e.g. via Compactor services) + */ + private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>(); + private static final String hostname = JavaUtils.hostname(); + + // Private methods should never catch SQLException and then throw MetaException. The public + // methods depend on SQLException coming back so they can detect and handle deadlocks. Private + // methods should only throw MetaException when they explicitly know there's a logic error and + // they want to throw past the public methods. + // + // All public methods that write to the database have to check for deadlocks when a SQLException + // comes back and handle it if they see one. This has to be done with the connection pooling + // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction, + // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. + + public TxnHandler() { + } + + /** + * This is logically part of c'tor and must be called prior to any other method. + * Not physically part of c'tor due to use of reflection + */ + public void setConf(Configuration conf) { + this.conf = conf; + + checkQFileTestHack(); + + synchronized (TxnHandler.class) { + if (connPool == null) { + Connection dbConn = null; + // Set up the JDBC connection pool + try { + int maxPoolSize = MetastoreConf.getIntVar(conf, ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS); + long getConnectionTimeoutMs = 30000; + connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs); + /*the mutex pools should ideally be somewhat larger since some operations require 1 + connection from each pool and we want to avoid taking a connection from primary pool + and then blocking because mutex pool is empty. There is only 1 thread in any HMS trying + to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock. The CheckLock operation gets a + connection from connPool first, then connPoolMutex. All others, go in the opposite + order (not very elegant...). So number of connection requests for connPoolMutex cannot + exceed (size of connPool + MUTEX_KEY.values().length - 1).*/ + connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); + sqlGenerator = new SQLGenerator(dbProduct, conf); + } catch (SQLException e) { + String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); + LOG.error(msg); + throw new RuntimeException(e); + } finally { + closeDbConn(dbConn); + } + } + } + + numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS); + + timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); + buildJumpTable(); + retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMSHANDLERINTERVAL, + TimeUnit.MILLISECONDS); + retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMSHANDLERATTEMPTS); + deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This method can run at READ_COMMITTED as long as long as + * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. + * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with + * adding corresponding entries into TXNS. The reason is that any txnid below HWM + * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + List<TxnInfo> txnInfos = new ArrayList<>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id, txn_state, txn_user, txn_host, txn_started, txn_last_heartbeat from " + + "TXNS where txn_id <= " + hwm; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + char c = rs.getString(2).charAt(0); + TxnState state; + switch (c) { + case TXN_ABORTED: + state = TxnState.ABORTED; + break; + + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + + " found in txns table"); + } + TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)); + txnInfo.setStartedTime(rs.getLong(5)); + txnInfo.setLastHeartbeatTime(rs.getLong(6)); + txnInfos.add(txnInfo); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + return new GetOpenTxnsInfoResponse(hwm, txnInfos); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxnsInfo"); + throw new MetaException("Unable to select from transaction database: " + getMessage(e) + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxnsInfo(); + } + } + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsResponse getOpenTxns() throws MetaException { + try { + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + + "initialized, null record found in next_txn_id"); + } + close(rs); + List<Long> openList = new ArrayList<>(); + //need the WHERE clause below to ensure consistent results with READ_COMMITTED + s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + long minOpenTxn = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + while (rs.next()) { + long txnId = rs.getLong(1); + openList.add(txnId); + char c = rs.getString(2).charAt(0); + if(c == TXN_OPEN) { + minOpenTxn = Math.min(minOpenTxn, txnId); + } else if (c == TXN_ABORTED) { + abortedBits.set(openList.size() - 1); + } + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); + if(minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxns"); + throw new MetaException("Unable to select from transaction database, " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return getOpenTxns(); + } + } + + /** + * Retry-by-caller note: + * Worst case, it will leave an open txn which will timeout. + */ + @Override + @RetrySemantics.Idempotent + public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (!tooManyOpenTxns && numOpenTxns.get() >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns.get() < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + + int numTxns = rqst.getNum_txns(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + lockInternal(); + /** + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. + * Also, advancing the counter must work when multiple metastores are running. + * SELECT ... FOR UPDATE is used to prevent + * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + // Make sure the user has not requested an insane amount of txns. + int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH); + if (numTxns > maxTxns) numTxns = maxTxns; + + stmt = dbConn.createStatement(); + String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + long first = rs.getLong(1); + s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + long now = getDbTime(dbConn); + List<Long> txnIds = new ArrayList<>(numTxns); + + List<String> rows = new ArrayList<>(); + for (long i = first; i < first + numTxns; i++) { + txnIds.add(i); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname())); + } + List<String> queries = sqlGenerator.createInsertValuesStmt( + "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows); + for (String q : queries) { + LOG.debug("Going to execute update <" + q + ">"); + stmt.execute(q); + } + LOG.debug("Going to commit"); + dbConn.commit(); + return new OpenTxnsResponse(txnIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return openTxns(rqst); + } + } + @Override + @RetrySemantics.Idempotent + public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { + long txnid = rqst.getTxnid(); + try { + Connection dbConn = null; + Statement stmt = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + stmt = dbConn.createStatement(); + TxnStatus status = findTxnState(txnid,stmt); + if(status == TxnStatus.ABORTED) { + LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) + + ") requested by it is already " + TxnStatus.ABORTED); + return; + } + raiseTxnUnexpectedState(status, txnid); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + abortTxn(rqst); + } + } + @Override + @RetrySemantics.Idempotent + public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { + List<Long> txnids = rqst.getTxn_ids(); + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + int numAborted = abortTxns(dbConn, txnids, false); + if (numAborted != txnids.size()) { + LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + + txnids.size() + " transactions. It's possible that the other " + + (txnids.size() - numAborted) + + " transactions have been aborted or committed, or the transaction ids are invalid."); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxns(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + abortTxns(rqst); + } + } + + /** + * Concurrency/isolation notes: + * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} + * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X + * see more notes below. + * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn + * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence + * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of + * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. + * + * Motivating example: + * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 + * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot + * that they read appropriately. In particular, if txns do not overlap, then one follows the other + * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure + * this by locking in snapshot after + * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn) + * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure + * that txn T which will be considered a later txn, locks in a snapshot that includes the result + * of S's commit (assuming no other txns). + * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions + * were running in parallel). If T and S both locked in the same snapshot (for example commit of + * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) + * 'x' would be updated to the same value by both, i.e. lost update. + */ + @Override + @RetrySemantics.Idempotent("No-op if already committed") + public void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + long txnid = rqst.getTxnid(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet lockHandle = null; + ResultSet commitIdRs = null, rs; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + /** + * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other + * operation can change this txn (such acquiring locks). While lock() and commitTxn() + * should not normally run concurrently (for same txn) but could due to bugs in the client + * which could then corrupt internal transaction manager state. Also competes with abortTxn(). + */ + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if (lockHandle == null) { + //if here, txn was not found (in expected state) + TxnStatus actualTxnStatus = findTxnState(txnid, stmt); + if(actualTxnStatus == TxnStatus.COMMITTED) { + /** + * This makes the operation idempotent + * (assume that this is most likely due to retry logic) + */ + LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg"); + return; + } + raiseTxnUnexpectedState(actualTxnStatus, txnid); + shouldNeverHappen(txnid); + //dbConn is rolled back in finally{} + } + String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; + rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); + if (rs.next()) { + close(rs); + //if here it means currently committing txn performed update/delete and we should check WW conflict + /** + * This S4U will mutex with other commitTxn() and openTxns(). + * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial + * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start + * at the same time and no new txns start until all 3 commit. + * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + */ + commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); + if (!commitIdRs.next()) { + throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + } + long commitId = commitIdRs.getLong(1); + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of it's columns + */ + int numCompsWritten = stmt.executeUpdate( + "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); + /** + * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + rs = stmt.executeQuery + (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + + "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " + + "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " + + "from WRITE_SET committed INNER JOIN WRITE_SET cur " + + "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + + //For partitioned table we always track writes at partition level (never at table) + //and for non partitioned - always at table level, thus the same table should never + //have entries with partition key and w/o + "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + + "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commitTxn() op + " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns + //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all + " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + + " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); + if (rs.next()) { + //found a conflict + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if (partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8); + close(rs); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + close(null, stmt, dbConn); + throw new TxnAbortedException(msg); + } else { + //no conflicting operations, proceed with the rest of commit sequence + } + } + else { + /** + * current txn didn't update/delete anything (may have inserted), so just proceed with commit + * + * We only care about commit id for write txns, so for RO (when supported) txns we don't + * have to mutex on NEXT_TXN_ID. + * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's + * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. + * If RO < W, then there is no reads-from relationship. + */ + } + // Move the record from txn_components into completed_txn_components so that the compactor + // knows where to look to compact. + String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute insert <" + s + ">"); + int modCount = 0; + if ((modCount = stmt.executeUpdate(s)) < 1) { + //this can be reasonable for an empty txn START/COMMIT or read-only txn + //also an IUD with DP that didn't match any rows. + LOG.info("Expected to move at least one record from txn_components to " + + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); + } + s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + modCount = stmt.executeUpdate(s); + s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + modCount = stmt.executeUpdate(s); + s = "delete from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + modCount = stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(commitIdRs); + close(lockHandle, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + commitTxn(rqst); + } + } + @Override + @RetrySemantics.SafeToRetry + public void performWriteSetGC() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + if(!rs.next()) { + throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); + } + long highestAllocatedTxnId = rs.getLong(1); + close(rs); + rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); + if(!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark + long lowestOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + //if here then there are no Open txns and highestAllocatedTxnId must be + //resolved (i.e. committed or aborted), either way + //there are no open txns with id <= highestAllocatedTxnId + //the +1 is there because "delete ..." below has < (which is correct for the case when + //there is an open txn + //Concurrency: even if new txn starts (or starts + commits) it is still true that + //there are no currently open txns that overlap with any committed txn with + //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. + commitHighWaterMark = highestAllocatedTxnId + 1; + } + else { + commitHighWaterMark = lowestOpenTxnId; + } + int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); + LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET"); + dbConn.commit(); + } catch (SQLException ex) { + LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); + } + finally { + close(rs, stmt, dbConn); + } + } + /** + * As much as possible (i.e. in absence of retries) we want both operations to be done on the same + * connection (but separate transactions). This avoid some flakiness in BONECP where if you + * perform an operation on 1 connection and immediately get another from the pool, the 2nd one + * doesn't see results of the first. + * + * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case + * there will be a duplicate set of locks but both sets will belong to the same txn so they + * will not conflict with each other. For locks w/o txn context (i.e. read-only query), this + * may lead to deadlock (at least a long wait). (e.g. 1st call creates locks in {@code LOCK_WAITING} + * mode and response gets lost. Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * retries, and enqueues another set of locks in LOCK_WAITING. The 2nd LockResponse is delivered + * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st + * set of locks times out. + */ + @RetrySemantics.CannotRetry + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); + try { + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + } + catch(NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); + } + } + private static final class ConnectionLockIdPair { + private final Connection dbConn; + private final long extLockId; + private ConnectionLockIdPair(Connection dbConn, long extLockId) { + this.dbConn = dbConn; + this.extLockId = extLockId; + } + } + + /** + * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read + * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but + * to the same value as before thus forcing db to acquire write lock for duration of the transaction. + * + * There is no real reason to return the ResultSet here other than to make sure the reference to it + * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock + * to be released. + * @param txnState the state this txn is expected to be in. may be null + * @return null if no row was found + * @throws SQLException + * @throws MetaException + */ + private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : ""); + ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query)); + if(rs.next()) { + return rs; + } + close(rs); + return null; + } + + /** + * This enters locks into the queue in {@link #LOCK_WAITING} mode. + * + * Isolation Level Notes: + * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes + * any 2 {@code enqueueLockWithRetry()} calls. + * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations + * @see #checkLockWithRetry(Connection, long, long) + */ + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + boolean success = false; + Connection dbConn = null; + try { + Statement stmt = null; + ResultSet rs = null; + ResultSet lockHandle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long txnid = rqst.getTxnid(); + stmt = dbConn.createStatement(); + if (isValidTxn(txnid)) { + //this also ensures that txn is still there in expected state + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + } + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + if (txnid > 0) { + List<String> rows = new ArrayList<>(); + // For each component in this lock request, + // add an entry to the txn_components table + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetIsAcid() && !lc.isIsAcid()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + boolean updateTxnComponents; + if(!lc.isSetOperationType()) { + //request came from old version of the client + updateTxnComponents = true;//this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + if(!lc.isSetIsDynamicPartitionWrite()) { + //must be old client talking, i.e. we don't know if it's DP so be conservative + updateTxnComponents = true; + } + else { + /** + * we know this is part of DP operation and so we'll get + * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list + * of partitions actually chaged. + */ + updateTxnComponents = !lc.isIsDynamicPartitionWrite(); + } + break; + case SELECT: + updateTxnComponents = false; + break; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + updateTxnComponents = false; + break; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + if(!updateTxnComponents) { + continue; + } + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + rows.add(txnid + ", '" + dbName + "', " + + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'")+ "," + + quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())); + } + List<String> queries = sqlGenerator.createInsertValuesStmt( + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows); + for(String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int modCount = stmt.executeUpdate(query); + } + } + + List<String> rows = new ArrayList<>(); + long intLockId = 0; + for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //old version of thrift client should have (lc.isSetOperationType() == false) but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables. It works correctly for boolean vars, e.g. LockComponent.isAcid). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + LockType lockType = lc.getType(); + char lockChar = 'z'; + switch (lockType) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + long now = getDbTime(dbConn); + rows.add(extLockId + ", " + intLockId + "," + txnid + ", " + + quoteString(dbName) + ", " + + valueOrNullLiteral(tblName) + ", " + + valueOrNullLiteral(partName) + ", " + + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + (isValidTxn(txnid) ? 0 : now) + ", " + + valueOrNullLiteral(rqst.getUser()) + ", " + + valueOrNullLiteral(rqst.getHostname()) + ", " + + valueOrNullLiteral(rqst.getAgentInfo()));// + ")"; + } + List<String> queries = sqlGenerator.createInsertValuesStmt( + "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " + + "hl_table, hl_partition,hl_lock_state, hl_lock_type, " + + "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows); + for(String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int modCount = stmt.executeUpdate(query); + } + dbConn.commit(); + success = true; + return new ConnectionLockIdPair(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle); + close(rs, stmt, null); + if (!success) { + /* This needs to return a "live" connection to be used by operation that follows it. + Thus it only closes Connection on failure/retry. */ + closeDbConn(dbConn); + } + unlockInternal(); + } + } + catch(RetryException e) { + return enqueueLockWithRetry(rqst); + } + } + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { + try { + try { + lockInternal(); + if(dbConn.isClosed()) { + //should only get here if retrying this op + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + } + return checkLock(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + unlockInternal(); + closeDbConn(dbConn); + } + } + catch(RetryException e) { + return checkLockWithRetry(dbConn, extLockId, txnId); + } + } + /** + * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. + * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. + * + * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(), + * in practice more often) + * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB. + * + * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. + * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. + * + * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking + * against doesn't move from W to A in another txn) but this method can heartbeat in + * separate txn at READ_COMMITTED. + * + * Retry-by-caller note: + * Retryable because {@link #checkLock(Connection, long)} is + */ + @Override + @RetrySemantics.SafeToRetry + public LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + try { + Connection dbConn = null; + long extLockId = rqst.getLockid(); + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + // Heartbeat on the lockid first, to assure that our lock is still valid. + // Then look up the lock info (hopefully in the cache). If these locks + // are associated with a transaction then heartbeat on that as well. + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if (info.txnId > 0) { + heartbeatTxn(dbConn, info.txnId); + } + else { + heartbeatLock(dbConn, extLockId); + } + //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and + //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired + //extra heartbeat is logically harmless, but ... + return checkLock(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); + throw new MetaException("Unable to update transaction database " + + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return checkLock(rqst); + } + + } + + /** + * This would have been made simpler if all locks were associated with a txn. Then only txn needs to + * be heartbeated, committed, etc. no need for client to track individual locks. + * When removing locks not associated with txn this potentially conflicts with + * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. + * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed + */ + @RetrySemantics.Idempotent + public void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + long extLockId = rqst.getLockid(); + try { + /** + * This method is logically like commit for read-only auto commit queries. + * READ_COMMITTED since this only has 1 delete statement and no new entries with the + * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are + * created in a single atomic operation. + * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} + * but hl_lock_ext_id is not known until that method returns. + * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)} + * but using SERIALIZABLE doesn't materially change the interaction. + * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + //hl_txnid <> 0 means it's associated with a transaction + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; + //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where + //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + //didn't find any lock with extLockId but at ReadCommitted there is a possibility that + //it existed when above delete ran but it didn't have the expected state. + LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" + + JavaUtils.lockIdToString(rqst.getLockid()) + ")"); + //bail here to make the operation idempotent + return; + } + if(info.txnId != 0) { + String msg = "Unlocking locks associated with transaction not permitted. " + info; + //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state + // which really means that the caller wants to give up waiting for the lock + LOG.error(msg); + throw new TxnOpenException(msg); + } + if(info.txnId == 0) { + //we didn't see this lock when running DELETE stmt above but now it showed up + //so should "should never happen" happened... + String msg = "Found lock in unexpected state " + info; + LOG.error(msg); + throw new MetaException(msg); + } + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "unlock(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + unlock(rqst); + } + } + + /** + * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse} + */ + private static class LockInfoExt extends LockInfo { + private final ShowLocksResponseElement e; + LockInfoExt(ShowLocksResponseElement e) { + super(e); + this.e = e; + } + } + @RetrySemantics.ReadOnly + public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { + try { + Connection dbConn = null; + ShowLocksResponse rsp = new ShowLocksResponse(); + List<ShowLocksResponseElement> elems = new ArrayList<>(); + List<LockInfoExt> sortedList = new ArrayList<>(); + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + + "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS"; + + // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. + String dbName = rqst.getDbname(); + String tableName = rqst.getTablename(); + String partName = rqst.getPartname(); + + StringBuilder filter = new StringBuilder(); + if (dbName != null && !dbName.isEmpty()) { + filter.append("hl_db=").append(quoteString(dbName)); + } + if (tableName != null && !tableName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_table=").append(quoteString(tableName)); + } + if (partName != null && !partName.isEmpty()) { + if (filter.length() > 0) { + filter.append(" and "); + } + filter.append("hl_partition=").append(quoteString(partName)); + } + String whereClause = filter.toString(); + + if (!whereClause.isEmpty()) { + s = s + " where " + whereClause; + } + + LOG.debug("Doing to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowLocksResponseElement e = new ShowLocksResponseElement(); + e.setLockid(rs.getLong(1)); + long txnid = rs.getLong(2); + if (!rs.wasNull()) e.setTxnid(txnid); + e.setDbname(rs.getString(3)); + e.setTablename(rs.getString(4)); + String partition = rs.getString(5); + if (partition != null) e.setPartname(partition); + switch (rs.getString(6).charAt(0)) { + case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; + case LOCK_WAITING: e.setState(LockState.WAITING); break; + default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); + } + switch (rs.getString(7).charAt(0)) { + case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; + case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; + case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; + default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); + } + e.setLastheartbeat(rs.getLong(8)); + long acquiredAt = rs.getLong(9); + if (!rs.wasNull()) e.setAcquiredat(acquiredAt); + e.setUser(rs.getString(10)); + e.setHostname(rs.getString(11)); + e.setLockIdInternal(rs.getLong(12)); + long id = rs.getLong(13); + if(!rs.wasNull()) { + e.setBlockedByExtId(id); + } + id = rs.getLong(14); + if(!rs.wasNull()) { + e.setBlockedByIntId(id); + } + e.setAgentInfo(rs.getString(15)); + sortedList.add(new LockInfoExt(e)); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + checkRetryable(dbConn, e, "showLocks(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined + //by checkLock() - makes diagnostics easier. + Collections.sort(sortedList, new LockInfoComparator()); + for(LockInfoExt lockInfoExt : sortedList) { + elems.add(lockInfoExt.e); + } + rsp.setLocks(elems); + return rsp; + } catch (RetryException e) { + return showLocks(rqst); + } + } + + /** + * {@code ids} should only have txnid or lockid but not both, ideally. + * Currently DBTxnManager.heartbeat() enforces this. + */ + @Override + @RetrySemantics.SafeToRetry + public void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + try { + Connection dbConn = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + heartbeatLock(dbConn, ids.getLockid()); + heartbeatTxn(dbConn, ids.getTxnid()); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeat(" + ids + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + heartbeat(ids); + } + } + @Override + @RetrySemantics.SafeToRetry + public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); + Set<Long> nosuch = new HashSet<>(); + Set<Long> aborted = new HashSet<>(); + rsp.setNosuch(nosuch); + rsp.setAborted(aborted); + try { + /** + * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)} + * only has 1 update statement in it and + * we only update existing txns, i.e. nothing can add additional txns that this operation + * would care about (which would have required SERIALIZABLE) + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + /*do fast path first (in 1 statement) if doesn't work, rollback and do the long version*/ + stmt = dbConn.createStatement(); + List<String> queries = new ArrayList<>(); + int numTxnsToHeartbeat = (int) (rqst.getMax() - rqst.getMin() + 1); + List<Long> txnIds = new ArrayList<>(numTxnsToHeartbeat); + for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { + txnIds.add(txn); + } + TxnUtils.buildQueryWithINClause(conf, queries, + new StringBuilder("update TXNS set txn_last_heartbeat = " + getDbTime(dbConn) + + " where txn_state = " + quoteChar(TXN_OPEN) + " and "), + new StringBuilder(""), txnIds, "txn_id", true, false); + int updateCnt = 0; + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + updateCnt += stmt.executeUpdate(query); + } + if (updateCnt == numTxnsToHeartbeat) { + //fast pass worked, i.e. all txns we were asked to heartbeat were Open as expected + dbConn.commit(); + return rsp; + } + //if here, do the slow path so that we can return info txns which were not in expected state + dbConn.rollback(); + for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { + try { + heartbeatTxn(dbConn, txn); + } catch (NoSuchTxnException e) { + nosuch.add(txn); + } catch (TxnAbortedException e) { + aborted.add(txn); + } + } + return rsp; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + return heartbeatTxnRange(rqst); + } + } + + long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { + // Get the id for the next entry in the queue + String s = sqlGenerator.addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + LOG.debug("going to execute query <" + s + ">"); + try (ResultSet rs = stmt.executeQuery(s)) { + if (!rs.next()) { + throw new IllegalStateException("Transaction tables not properly initiated, " + + "no record found in next_compaction_queue_id"); + } + long id = rs.getLong(1); + s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + return id; + } + } + @Override + @RetrySemantics.Idempotent + public CompactionResponse compact(CompactionRequest rqst) throws MetaException { + // Put a compaction request in the queue. + try { + Connection dbConn = null; + Statement stmt = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + /** + * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 entry in + * Initiated/Working state for any resource. This ensures that we don't run concurrent + * compactions for any resource. + */ + handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name()); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + long id = generateCompactionQueueId(stmt); + + StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where"). + append(" cq_state IN(").append(quoteChar(INITIATED_STATE)). + append(",").append(quoteChar(WORKING_STATE)). + append(") AND cq_database=").append(quoteString(rqst.getDbname())). + append(" AND cq_table=").append(quoteString(rqst.getTablename())).append(" AND "); + if(rqst.getPartitionname() == null) { + sb.append("cq_partition is null"); + } + else { + sb.append("cq_partition=").append(quoteString(rqst.getPartitionname())); + } + + LOG.debug("Going to execute query <" + sb.toString() + ">"); + ResultSet rs = stmt.executeQuery(sb.toString()); + if(rs.next()) { + long enqueuedId = rs.getLong(1); + String state = compactorStateToResponse(rs.getString(2).charAt(0)); + LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() + + "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) + + " with id=" + enqueuedId); + return new CompactionResponse(enqueuedId, state, false); + } + close(rs); + StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + + "cq_table, "); + String partName = rqst.getPartitionname(); + if (partName != null) buf.append("cq_partition, "); + buf.append("cq_state, cq_type"); + if (rqst.getProperties() != null) { + buf.append(", cq_tblproperties"); + } + if (rqst.getRunas() != null) buf.append(", cq_run_as"); + buf.append(") values ("); + buf.append(id); + buf.append(", '"); + buf.append(rqst.getDbname()); + buf.append("', '"); + buf.append(rqst.getTablename()); + buf.append("', '"); + if (partName != null) { + buf.append(partName); + buf.append("', '"); + } + buf.append(INITIATED_STATE); + buf.append("', '"); + switch (rqst.getType()) { + case MAJOR: + buf.append(MAJOR_TYPE); + break; + + case MINOR: + buf.append(MINOR_TYPE); + break; + + default: + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); + } + if (rqst.getProperties() != null) { + buf.append("', '"); + buf.append(new StringableMap(rqst.getProperties()).toString()); + } + if (rqst.getRunas() != null) { + buf.append("', '"); + buf.append(rqst.getRunas()); + } + buf.append("')"); + String s = buf.toString(); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + return new CompactionResponse(id, INITIATED_RESPONSE, true); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "compact(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + return compact(rqst); + } + } + + private static String compactorStateToResponse(char s) { + switch (s) { + case INITIATED_STATE: return INITIATED_RESPONSE; + case WORKING_STATE: return WORKING_RESPONSE; + case READY_FOR_CLEANING: return CLEANING_RESPONSE; + case FAILED_STATE: return FAILED_RESPONSE; + case SUCCEEDED_STATE: return SUCCEEDED_RESPONSE; + case ATTEMPTED_STATE: return ATTEMPTED_RESPONSE; + default: + return Character.toString(s); + } + } + @RetrySemantics.ReadOnly + public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { + ShowCompactResponse response = new ShowCompactResponse(new ArrayList<>()); + Connection dbConn = null; + Statement stmt = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + //-1 because 'null' literal doesn't work for all DBs... + "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; + //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) + //to sort so that currently running jobs are at the end of the list (bottom of screen) + //and currently running ones are in sorted by start time + //w/o order by likely currently running compactions will be first (LHS of Union) + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowCompactResponseElement e = new ShowCompactResponseElement(); + e.setDbname(rs.getString(1)); + e.setTablename(rs.getString(2)); + e.setPartitionname(rs.getString(3)); + e.setState(compactorStateToResponse(rs.getString(4).charAt(0))); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; + case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + default: + //do nothing to handle RU/D if we add another status + } + e.setWorkerid(rs.getString(6)); + long start = rs.getLong(7); + if(!rs.wasNull()) { + e.setStart(start); + } + long endTime = rs.getLong(8); + if(endTime != -1) { + e.setEndTime(endTime); + } + e.setRunAs(rs.getString(9)); + e.setHadoopJobId(rs.getString(10)); + e.setId(rs.getLong(11)); + response.addToCompacts(e); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "showCompact(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + return response; + } catch (RetryException e) { + return showCompact(rqst); + } + } + + private static void shouldNeverHappen(long txnid) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); + } + private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " " + + JavaUtils.lockIdToString(extLockId) + " " + intLockId); + } + + /** + * Retry-by-caller note: + * This may be retried after dbConn.commit. At worst, it will create duplicate entries in + * TXN_COMPONENTS which won't affect anything. See more comments in {@link #commitTxn(CommitTxnRequest)} + */ + @Override + @RetrySemantics.SafeToRetry + public void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet lockHandle = null; + ResultSet rs = null; + try { + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if(lockHandle == null) { + //ensures txn is still there and in expected state + ensureValidTxn(dbConn, rqst.getTxnid(), stmt); + shouldNeverHappen(rqst.getTxnid()); + } + //for RU this may be null so we should default it to 'u' which is most restrictive + OpertaionType ot = OpertaionType.UPDATE; + if(rqst.isSetOperationType()) { + ot = OpertaionType.fromDataOperationType(rqst.getOperationType()); + } + List<String> rows = new ArrayList<>(); + for (String partName : rqst.getPartitionnames()) { + rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst)); + } + int modCount = 0; + //record partitions that were written to + List<String> queries = sqlGenerator.createInsertValuesStmt( + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows); + for(String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + modCount = stmt.executeUpdate(query); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")"); + throw new MetaException("Unable to insert into from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + addDynamicPartitions(rqst); + } + } + + /** + * Clean up corresponding records in metastore tables when corresponding object is dropped, + * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc + * operations. + */ + @Override + @RetrySemantics.Idempotent + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + + try { + String dbName; + String tblName; + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<String> queries = new ArrayList<>(); + StringBuilder buff = new StringBuilder(); + + switch (type) { + case DATABASE: + dbName = db.getName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case TABLE: + dbName = table.getDbName(); + tblName = table.getTableName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case PARTITION: + dbName = table.getDbName(); + tblName = table.getTableName(); + List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns + List<String> partVals; // partition values + String partName; + + while (partitionIterator.hasNext()) { + Partition p = partitionIterator.next(); + partVals = p.getValues(); + partName = Warehouse.makePartName(partCols, partVals); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("' and tc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("' and ctc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("' and cq_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("' and cc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + } + + break; + default: + throw new MetaException("Invalid object type for cleanup: " + type); + } + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + cleanupRecords(type, db, table, partitionIterator); + } + } + + /** + * For testing only, do not use. + */ + @VisibleForTesting + public int numLocksInLockTable() throws SQLException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from HIVE_LOCKS"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + rs.next(); + int rc = rs.getInt(1); + // Necessary to clean up the transaction in the db. + dbConn.rollback(); + return rc; + } finally { + close(rs, stmt, dbConn); + } + } + + /** + * For testing only, do not use. + */ + public long setTimeout(long milliseconds) { + long previous_timeout = timeout; + timeout = milliseconds; + return previous_timeout; + } + + protected class RetryException extends Exception { + + } + + Connection getDbConn(int isolationLevel) throws SQLException { + return getDbConn(isolationLevel, connPool); + } + private Connection getDbConn(int isolationLevel, DataSource connPool) throws SQLException { + int rc = doRetryOnConnPool ? 10 : 1; + Connection dbConn = null; + while (true) { + try { + dbConn = connPool.getConnection(); + dbConn.setAutoCommit(false); + dbConn.setTransactionIsolation(isolationLevel); + return dbConn; + } catch (SQLException e){ + closeDbConn(dbConn); + if ((--rc) <= 0) throw e; + LOG.error("There is a problem with a connection from th
<TRUNCATED>
