veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1425276096
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -648,260 +400,85 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst)
throws MetaException {
throw new MetaException("Invalid input for number of txns: " + numTxns);
}
- try {
- Connection dbConn = null;
- Statement stmt = null;
- try {
- /*
- * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work
correctly, this operation must ensure
- * that looking at the TXNS table every open transaction could be
identified below a given High Water Mark.
- * One way to do it, would be to serialize the openTxns call with a
S4U lock, but that would cause
- * performance degradation with high transaction load.
- * To enable parallel openTxn calls, we define a time period
(TXN_OPENTXN_TIMEOUT) and consider every
- * transaction missing from the TXNS table in that period open, and
prevent opening transaction outside
- * the period.
- * Example: At t[0] there is one open transaction in the TXNS table,
T[1].
- * T[2] acquires the next sequence at t[1] but only commits into the
TXNS table at t[10].
- * T[3] acquires its sequence at t[2], and commits into the TXNS table
at t[3].
- * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also
T[2] in the snapshot’s
- * open transaction list. T[1] because it is presented as open in TXNS,
- * T[2] because it is a missing sequence.
- *
- * In the current design, there can be several metastore instances
running in a given Warehouse.
- * This makes ideas like reserving a range of IDs to save trips to DB
impossible. For example,
- * a client may go to MS1 and start a transaction with ID 500 to
update a particular row.
- * Now the same client will start another transaction, except it ends
up on MS2 and may get
- * transaction ID 400 and update the same row. Now the merge that
happens to materialize the snapshot
- * on read will thing the version of the row from transaction ID 500
is the latest one.
- *
- * Longer term we can consider running Active-Passive MS (at least wrt
to ACID operations). This
- * set could support a write-through cache for added performance.
- */
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- /*
- * The openTxn and commitTxn must be mutexed, when committing a not
read only transaction.
- * This is achieved by requesting a shared table lock here, and an
exclusive one at commit.
- * Since table locks are working in Derby, we don't need the
lockInternal call here.
- * Example: Suppose we have two transactions with update like x = x+1.
- * We have T[3,3] that was using a value from a snapshot with T[2,2].
If we allow committing T[3,3]
- * and opening T[4] parallel it is possible, that T[4] will be using
the value from a snapshot with T[2,2],
- * and we will have a lost update problem
- */
- acquireTxnLock(stmt, true);
- // Measure the time from acquiring the sequence value, till committing
in the TXNS table
- StopWatch generateTransactionWatch = new StopWatch();
- generateTransactionWatch.start();
-
- List<Long> txnIds = openTxns(dbConn, rqst);
+ /*
+ * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work
correctly, this operation must ensure
+ * that looking at the TXNS table every open transaction could be
identified below a given High Water Mark.
+ * One way to do it, would be to serialize the openTxns call with a S4U
lock, but that would cause
+ * performance degradation with high transaction load.
+ * To enable parallel openTxn calls, we define a time period
(TXN_OPENTXN_TIMEOUT) and consider every
+ * transaction missing from the TXNS table in that period open, and
prevent opening transaction outside
+ * the period.
+ * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+ * T[2] acquires the next sequence at t[1] but only commits into the TXNS
table at t[10].
+ * T[3] acquires its sequence at t[2], and commits into the TXNS table at
t[3].
+ * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2]
in the snapshot’s
+ * open transaction list. T[1] because it is presented as open in TXNS,
+ * T[2] because it is a missing sequence.
+ *
+ * In the current design, there can be several metastore instances running
in a given Warehouse.
+ * This makes ideas like reserving a range of IDs to save trips to DB
impossible. For example,
+ * a client may go to MS1 and start a transaction with ID 500 to update a
particular row.
+ * Now the same client will start another transaction, except it ends up
on MS2 and may get
+ * transaction ID 400 and update the same row. Now the merge that happens
to materialize the snapshot
+ * on read will thing the version of the row from transaction ID 500 is
the latest one.
+ *
+ * Longer term we can consider running Active-Passive MS (at least wrt to
ACID operations). This
+ * set could support a write-through cache for added performance.
+ */
+ /*
+ * The openTxn and commitTxn must be mutexed, when committing a not read
only transaction.
+ * This is achieved by requesting a shared table lock here, and an
exclusive one at commit.
+ * Since table locks are working in Derby, we don't need the lockInternal
call here.
+ * Example: Suppose we have two transactions with update like x = x+1.
+ * We have T[3,3] that was using a value from a snapshot with T[2,2]. If
we allow committing T[3,3]
+ * and opening T[4] parallel it is possible, that T[4] will be using the
value from a snapshot with T[2,2],
+ * and we will have a lost update problem
+ */
+ acquireTxnLock(true);
+ // Measure the time from acquiring the sequence value, till committing in
the TXNS table
+ StopWatch generateTransactionWatch = new StopWatch();
+ generateTransactionWatch.start();
- LOG.debug("Going to commit");
- dbConn.commit();
- generateTransactionWatch.stop();
- long elapsedMillis =
generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
- TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() :
TxnType.DEFAULT;
- if (txnType != TxnType.READ_ONLY && elapsedMillis >=
openTxnTimeOutMillis) {
- /*
- * The commit was too slow, we can not allow this to continue
(except if it is read only,
- * since that can not cause dirty reads).
- * When calculating the snapshot for a given transaction, we look
back for possible open transactions
- * (that are not yet committed in the TXNS table), for
TXN_OPENTXN_TIMEOUT period.
- * We can not allow a write transaction, that was slower than
TXN_OPENTXN_TIMEOUT to continue,
- * because there can be other transactions running, that didn't
considered this transactionId open,
- * this could cause dirty reads.
- */
- LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting
transactionIds: {}", elapsedMillis, txnIds);
- deleteInvalidOpenTransactions(dbConn, txnIds);
- dbConn.commit();
- /*
- * We do not throw RetryException directly, to not circumvent the
max retry limit
- */
- throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY);
- }
- return new OpenTxnsResponse(txnIds);
- } catch (SQLException e) {
- LOG.debug("Going to rollback: ", e);
- rollbackDBConn(dbConn);
- checkRetryable(e, "openTxns(" + rqst + ")");
- throw new MetaException("Unable to select from transaction database "
+ StringUtils.stringifyException(e));
- } finally {
- close(null, stmt, dbConn);
- }
- } catch (RetryException e) {
- return openTxns(rqst);
- }
- }
+ List<Long> txnIds = new OpenTxnsFunction(rqst, openTxnTimeOutMillis,
transactionalListeners).execute(jdbcResource);
- private List<Long> openTxns(Connection dbConn, OpenTxnRequest rqst)
- throws SQLException, MetaException {
- int numTxns = rqst.getNum_txns();
- // Make sure the user has not requested an insane amount of txns.
- int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
- if (numTxns > maxTxns) {
- numTxns = maxTxns;
- }
- List<PreparedStatement> insertPreparedStmts = null;
+ LOG.debug("Going to commit");
+
jdbcResource.getTransactionManager().getActiveTransaction().createSavepoint();
+ generateTransactionWatch.stop();
+ long elapsedMillis =
generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() :
TxnType.DEFAULT;
- boolean isReplayedReplTxn = txnType == TxnType.REPL_CREATED;
- boolean isHiveReplTxn = rqst.isSetReplPolicy() && txnType ==
TxnType.DEFAULT;
- try {
- if (isReplayedReplTxn) {
- assert rqst.isSetReplPolicy();
- List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(),
rqst.getReplSrcTxnIds(), dbConn);
-
- if (!targetTxnIdList.isEmpty()) {
- if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
- LOG.warn("target txn id number {} is not matching with source txn
id number {}",
- targetTxnIdList, rqst.getReplSrcTxnIds());
- }
- LOG.info("Target transactions {} are present for repl policy : {}
and Source transaction id : {}",
- targetTxnIdList.toString(), rqst.getReplPolicy(),
rqst.getReplSrcTxnIds().toString());
- return targetTxnIdList;
- }
- }
-
- long minOpenTxnId = 0;
- if (useMinHistoryLevel) {
- minOpenTxnId = getMinOpenTxnIdWaterMark(dbConn);
- }
-
- List<Long> txnIds = new ArrayList<>(numTxns);
+ if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis)
{
/*
- * The getGeneratedKeys are not supported in every dbms, after executing
a multi line insert.
- * But it is supported in every used dbms for single line insert, even
if the metadata says otherwise.
- * If the getGeneratedKeys are not supported first we insert a random
batchId in the TXN_META_INFO field,
- * then the keys are selected beck with that batchid.
+ * The commit was too slow, we can not allow this to continue (except if
it is read only,
+ * since that can not cause dirty reads).
+ * When calculating the snapshot for a given transaction, we look back
for possible open transactions
+ * (that are not yet committed in the TXNS table), for
TXN_OPENTXN_TIMEOUT period.
+ * We can not allow a write transaction, that was slower than
TXN_OPENTXN_TIMEOUT to continue,
+ * because there can be other transactions running, that didn't
considered this transactionId open,
+ * this could cause dirty reads.
*/
- boolean genKeySupport = dbProduct.supportsGetGeneratedKeys();
- genKeySupport = genKeySupport || (numTxns == 1);
-
- String insertQuery = String.format(TXNS_INSERT_QRY,
getEpochFn(dbProduct), getEpochFn(dbProduct));
- LOG.debug("Going to execute insert <{}>", insertQuery);
- try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new
String[] {"TXN_ID"})) {
- String state = genKeySupport ? TxnStatus.OPEN.getSqlConst() :
TXN_TMP_STATE;
- if (numTxns == 1) {
- ps.setString(1, state);
- ps.setString(2, rqst.getUser());
- ps.setString(3, rqst.getHostname());
- ps.setInt(4, txnType.getValue());
- txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn,
genKeySupport, ps, false));
- } else {
- for (int i = 0; i < numTxns; ++i) {
- ps.setString(1, state);
- ps.setString(2, rqst.getUser());
- ps.setString(3, rqst.getHostname());
- ps.setInt(4, txnType.getValue());
- ps.addBatch();
-
- if ((i + 1) % maxBatchSize == 0) {
-
txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn,
genKeySupport, ps, true));
- }
- }
- if (numTxns % maxBatchSize != 0) {
- txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn,
genKeySupport, ps, true));
- }
- }
- }
-
- assert txnIds.size() == numTxns;
-
- addTxnToMinHistoryLevel(dbConn, txnIds, minOpenTxnId);
-
- if (isReplayedReplTxn) {
- List<String> rowsRepl = new ArrayList<>(numTxns);
- List<String> params = Collections.singletonList(rqst.getReplPolicy());
- List<List<String>> paramsList = new ArrayList<>(numTxns);
- for (int i = 0; i < numTxns; i++) {
- rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," +
txnIds.get(i));
- paramsList.add(params);
- }
-
- insertPreparedStmts =
sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"REPL_TXN_MAP\" (\"RTM_REPL_POLICY\", \"RTM_SRC_TXN_ID\",
\"RTM_TARGET_TXN_ID\")", rowsRepl,
- paramsList);
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.execute();
- }
- }
-
- if (transactionalListeners != null && !isHiveReplTxn) {
-
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds,
txnType), dbConn, sqlGenerator);
- }
- return txnIds;
- } finally {
- if (insertPreparedStmts != null) {
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.close();
- }
- }
- }
- }
+ LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting
transactionIds: {}", elapsedMillis, txnIds);
- private List<Long> executeTxnInsertBatchAndExtractGeneratedKeys(Connection
dbConn, boolean genKeySupport,
- PreparedStatement ps, boolean batch) throws SQLException {
- List<Long> txnIds = new ArrayList<>();
- if (batch) {
- ps.executeBatch();
- } else {
- // For slight performance advantage we do not use the executeBatch, when
we only have one row
- ps.execute();
- }
- if (genKeySupport) {
- try (ResultSet generatedKeys = ps.getGeneratedKeys()) {
- while (generatedKeys.next()) {
- txnIds.add(generatedKeys.getLong(1));
- }
- }
- } else {
- try (PreparedStatement pstmt =
- dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE
\"TXN_STATE\" = ?")) {
- pstmt.setString(1, TXN_TMP_STATE);
- try (ResultSet rs = pstmt.executeQuery()) {
- while (rs.next()) {
- txnIds.add(rs.getLong(1));
- }
+ if (!txnIds.isEmpty()) {
+ try {
+ sqlRetryHandler.executeWithRetry(new
SqlRetryCallProperties().withCallerId("deleteInvalidOpenTransactions"),
+ () -> {
+ jdbcResource.execute(new DeleteInvalidOpenTxnsCommand(txnIds));
+ LOG.info("Removed transactions: ({}) from TXNS", txnIds);
+ jdbcResource.execute(new
RemoveTxnsFromMinHistoryLevelCommand(txnIds));
+ return null;
+ });
Review Comment:
Yes, openTxns() has the `Transactional` annotation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]