This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 764c0be4788 HIVE-29434: CommitTxnFunction refactor (#6290)
764c0be4788 is described below
commit 764c0be478874f593a9f8bf39bb784028e883ff4
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Feb 24 11:01:00 2026 +0200
HIVE-29434: CommitTxnFunction refactor (#6290)
---
.../txn/jdbc/functions/CommitTxnFunction.java | 675 ++++++++++++---------
1 file changed, 373 insertions(+), 302 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
index 661f7b37e6e..522ce558df5 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.metastore.txn.jdbc.functions;
-import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
@@ -60,21 +59,22 @@
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import java.sql.ResultSet;
-import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars;
import static
org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
@@ -84,6 +84,9 @@ public class CommitTxnFunction implements
TransactionalFunction<TxnType> {
private static final Logger LOG =
LoggerFactory.getLogger(CommitTxnFunction.class);
+ private static final EnumSet<TxnType> WRITE_SET_SKIP_TXN_TYPES =
+ EnumSet.of(TxnType.READ_ONLY, TxnType.SOFT_DELETE, TxnType.COMPACTION);
+
private final CommitTxnRequest rqst;
private final List<TransactionalMetaStoreEventListener>
transactionalListeners;
@@ -94,23 +97,17 @@ public CommitTxnFunction(CommitTxnRequest rqst,
List<TransactionalMetaStoreEvent
@Override
public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws
MetaException, NoSuchTxnException, TxnAbortedException {
- char isUpdateDelete = 'N';
long txnid = rqst.getTxnid();
long sourceTxnId = -1;
- boolean isReplayedReplTxn =
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
- boolean isHiveReplTxn = rqst.isSetReplPolicy() &&
TxnType.DEFAULT.equals(rqst.getTxn_type());
+ boolean isReplayedReplTxn = TxnType.REPL_CREATED == rqst.getTxn_type();
+ boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT ==
rqst.getTxn_type();
//Find the write details for this transaction.
//Doing it here before the metadata tables are updated below.
- List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
-
- if (!isHiveReplTxn) {
- txnWriteDetails = jdbcResource.execute(new
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
+ List<TxnWriteDetails> txnWriteDetails = loadTxnWriteDetails(jdbcResource,
isHiveReplTxn, txnid);
- }
// Get the current TXN
TransactionContext context =
jdbcResource.getTransactionManager().getActiveTransaction();
- Long commitId = null;
if (rqst.isSetReplLastIdInfo()) {
updateReplId(jdbcResource, rqst.getReplLastIdInfo());
@@ -118,145 +115,99 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
if (isReplayedReplTxn) {
assert (rqst.isSetReplPolicy());
+ txnid = resolveTargetTxnId(jdbcResource, rqst.getTxnid());
sourceTxnId = rqst.getTxnid();
- List<Long> targetTxnIds = jdbcResource.execute(new
TargetTxnIdListHandler(rqst.getReplPolicy(),
Collections.singletonList(sourceTxnId)));
- if (targetTxnIds.isEmpty()) {
- // Idempotent case where txn was already closed or commit txn event
received without
- // corresponding open txn event.
- LOG.info("Target txn id is missing for source txn id : {} and repl
policy {}", sourceTxnId,
- rqst.getReplPolicy());
- throw new RollbackException(null);
- }
- assert targetTxnIds.size() == 1;
- txnid = targetTxnIds.getFirst();
}
- /**
- * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures
that no other
- * operation can change this txn (such acquiring locks). While lock() and
commitTxn()
- * should not normally run concurrently (for same txn) but could due to
bugs in the client
- * which could then corrupt internal transaction manager state. Also
competes with abortTxn().
- */
- TxnType txnType = jdbcResource.execute(new
GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), txnid));
+ TxnType txnType = getOpenTxnTypeAndLock(jdbcResource, txnid,
isReplayedReplTxn);
if (txnType == null) {
- //if here, txn was not found (in expected state)
- TxnStatus actualTxnStatus = jdbcResource.execute(new
FindTxnStateHandler(txnid));
- if (actualTxnStatus == TxnStatus.COMMITTED) {
- if (isReplayedReplTxn) {
- // in case of replication, idempotent is taken care by getTargetTxnId
- LOG.warn("Invalid state COMMITTED for transactions started using
replication replay task");
- }
- /**
- * This makes the operation idempotent
- * (assume that this is most likely due to retry logic)
- */
- LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
- return null;
- }
- TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+ return null;
}
- String conflictSQLSuffix = String.format("""
- FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND
"TC_OPERATION_TYPE" IN (%s, %s)
- """, OperationType.UPDATE, OperationType.DELETE);
- long tempCommitId = TxnUtils.generateTemporaryId();
+ CommitInfo commitInfo = prepareWriteSetAndCheckConflicts(
+ jdbcResource, context,
+ txnid, txnType, isReplayedReplTxn);
- if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
- if (!ConfVars.useMinHistoryWriteId()) {
- new AcquireTxnLockFunction(false).execute(jdbcResource);
- }
- commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+ handleCompletedTxnComponents(
+ jdbcResource,
+ txnid, txnType, commitInfo.opFlag(), isReplayedReplTxn, sourceTxnId);
- } else if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) {
- String writeSetInsertSql = """
- INSERT INTO "WRITE_SET"
- ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID",
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
- SELECT DISTINCT
- "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
- :commitId,
- "TC_OPERATION_TYPE"
- """;
-
- boolean isUpdateOrDelete =
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
- jdbcResource.getSqlGenerator()
- .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
- new MapSqlParameterSource()
- .addValue("txnId", txnid),
- ResultSet::next));
-
- if (isUpdateOrDelete) {
- isUpdateDelete = 'Y';
- //if here it means currently committing txn performed update/delete
and we should check WW conflict
- /**
- * "select distinct" is used below because
- * 1. once we get to multi-statement txns, we only care to record that
something was updated once
- * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is
retried by caller it may create
- * duplicate entries in TXN_COMPONENTS
- * but we want to add a PK on WRITE_SET which won't have unique rows
w/o this distinct
- * even if it includes all of its columns
- *
- * First insert into write_set using a temporary commitID, which will
be updated in a separate call,
- * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long,
TxnType, Long, long)}}.
- * This should decrease the scope of the S4U lock on the next_txn_id
table.
- */
- Object undoWriteSetForCurrentTxn = context.createSavepoint();
- jdbcResource.getJdbcTemplate().update(
- writeSetInsertSql + (ConfVars.useMinHistoryLevel() ?
conflictSQLSuffix :
- "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
- (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND
\"TC_OPERATION_TYPE\" <> :type")),
- new MapSqlParameterSource()
- .addValue("txnId", txnid)
- .addValue("type", OperationType.COMPACT.getSqlConst())
- .addValue("commitId", tempCommitId));
+ if (rqst.isSetKeyValue()) {
+ updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
+ }
- /**
- * This S4U will mutex with other commitTxn() and openTxns().
- * -1 below makes txn intervals look like [3,3] [4,4] if all txns are
serial
- * Note: it's possible to have several txns have the same commit id.
Suppose 3 txns start
- * at the same time and no new txns start until all 3 commit.
- * We could've incremented the sequence for commitId as well but it
doesn't add anything functionally.
- */
- new AcquireTxnLockFunction(false).execute(jdbcResource);
- commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
-
- if (!rqst.isExclWriteEnabled()) {
- /**
- * see if there are any overlapping txns that wrote the same
element, i.e. have a conflict
- * Since entire commit operation is mutexed wrt other start/commit
ops,
- * committed.ws_commit_id <= current.ws_commit_id for all txns
- * thus if committed.ws_commit_id < current.ws_txnid, transactions
do NOT overlap
- * For example, [17,20] is committed, [6,80] is being committed
right now - these overlap
- * [17,20] committed and [21,21] committing now - these do not
overlap.
- * [17,18] committed and [18,19] committing now - these overlap
(here 18 started while 17 was still running)
- */
- WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
- if (info != null) {
- //found a conflict, so let's abort the txn
- String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId) +
"," + info.committedCommitId + "]";
- StringBuilder resource = new
StringBuilder(info.database).append("/").append(info.table);
- if (info.partition != null) {
- resource.append('/').append(info.partition);
- }
- String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," +
commitId + "]" + " due to a write conflict on " + resource +
- " committed by " + committedTxn + " " +
info.currentOperationType + "/" + info.committedOperationType;
- //remove WRITE_SET info for current txn since it's about to abort
- context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
- LOG.info(msg);
- //todo: should make abortTxns() write something into
TXNS.TXN_META_INFO about this
- if (new AbortTxnsFunction(Collections.singletonList(txnid), false,
false,
- isReplayedReplTxn,
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource) != 1) {
- throw new IllegalStateException(msg + " FAILED!");
- }
- throw new TxnAbortedException(msg);
- }
- }
- } else if (!ConfVars.useMinHistoryLevel()) {
- jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
- new MapSqlParameterSource()
- .addValue("txnId", txnid)
- .addValue("commitId", jdbcResource.execute(new
GetHighWaterMarkHandler())));
+ if (!isHiveReplTxn) {
+ createCommitNotificationEvent(jdbcResource, txnid , txnType,
txnWriteDetails);
+ }
+
+ updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType,
commitInfo);
+ LOG.debug("Going to commit");
+
+ if (MetastoreConf.getBoolVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
+
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
+ }
+ return txnType;
+ }
+
+ private List<TxnWriteDetails>
loadTxnWriteDetails(MultiDataSourceJdbcResource jdbcResource,
+ boolean isHiveReplTxn, long txnid) throws MetaException {
+ if (isHiveReplTxn) {
+ return new ArrayList<>();
+ }
+ return jdbcResource.execute(
+ new GetWriteIdsMappingForTxnIdsHandler(Set.of(txnid)));
+ }
+
+ private Long resolveTargetTxnId(MultiDataSourceJdbcResource jdbcResource,
long sourceTxnId)
+ throws MetaException {
+ List<Long> targetTxnIds = jdbcResource.execute(
+ new TargetTxnIdListHandler(rqst.getReplPolicy(),
List.of(sourceTxnId)));
+ if (targetTxnIds.isEmpty()) {
+ // Idempotent case where txn was already closed or commit txn event
received without
+ // corresponding open txn event.
+ LOG.info("Target txn id is missing for source txn id : {} and repl
policy {}", sourceTxnId,
+ rqst.getReplPolicy());
+ throw new RollbackException(null);
+ }
+ return targetTxnIds.getFirst();
+ }
+
+ /**
+ * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures
that no other
+ * operation can change this txn (such acquiring locks). While lock() and
commitTxn()
+ * should not normally run concurrently (for same txn) but could due to bugs
in the client
+ * which could then corrupt internal transaction manager state. Also
competes with abortTxn().
+ */
+ private TxnType getOpenTxnTypeAndLock(MultiDataSourceJdbcResource
jdbcResource, long txnid,
+ boolean isReplayedReplTxn) throws MetaException, NoSuchTxnException,
TxnAbortedException {
+ TxnType txnType = jdbcResource.execute(
+ new GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(),
txnid));
+ if (txnType != null) {
+ return txnType;
+ }
+ //if here, txn was not found (in expected state)
+ TxnStatus actualTxnStatus = jdbcResource.execute(new
FindTxnStateHandler(txnid));
+ if (actualTxnStatus == TxnStatus.COMMITTED) {
+ if (isReplayedReplTxn) {
+ // in case of replication, idempotent is taken care by getTargetTxnId
+ LOG.warn("Invalid state COMMITTED for transactions started using
replication replay task");
}
- } else {
+ /**
+ * This makes the operation idempotent
+ * (assume that this is most likely due to retry logic)
+ */
+ LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
+ return null;
+ }
+ TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+ return null;
+ }
+
+ private CommitInfo
prepareWriteSetAndCheckConflicts(MultiDataSourceJdbcResource jdbcResource,
+ TransactionContext context, long txnid, TxnType txnType, boolean
isReplayedReplTxn)
+ throws MetaException, TxnAbortedException {
+
+ if (WRITE_SET_SKIP_TXN_TYPES.contains(txnType) || isReplayedReplTxn) {
/*
* current txn didn't update/delete anything (may have inserted), so
just proceed with commit
*
@@ -267,36 +218,197 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
* If RO < W, then there is no reads-from relationship.
* In replication flow we don't expect any write write conflict as it
should have been handled at source.
*/
- assert true;
+ return CommitInfo.empty();
}
+ String conflictSQLSuffix = String.format("""
+ FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND
"TC_OPERATION_TYPE" IN (%s, %s)
+ """, OperationType.UPDATE, OperationType.DELETE);
+ String writeSetInsertSql = """
+ INSERT INTO "WRITE_SET"
+ ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID",
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+ SELECT DISTINCT
+ "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+ :commitId,
+ "TC_OPERATION_TYPE"
+ """;
+
+ boolean isUpdateOrDelete =
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
+ jdbcResource.getSqlGenerator()
+ .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid),
+ ResultSet::next));
+
+ Long commitId = null;
+ long tempCommitId = 0L;
+
+ if (isUpdateOrDelete) {
+ tempCommitId = TxnUtils.generateTemporaryId();
+ //if here it means currently committing txn performed update/delete and
we should check WW conflict
+ /**
+ * "select distinct" is used below because
+ * 1. once we get to multi-statement txns, we only care to record that
something was updated once
+ * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried
by caller it may create
+ * duplicate entries in TXN_COMPONENTS
+ * but we want to add a PK on WRITE_SET which won't have unique rows w/o
this distinct
+ * even if it includes all of its columns
+ *
+ * First insert into write_set using a temporary commitID, which will be
updated in a separate call,
+ * see: {@link
#updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource, long, TxnType,
Long, long)}}.
+ * This should decrease the scope of the S4U lock on the next_txn_id
table.
+ */
+ Object undoWriteSetForCurrentTxn = context.createSavepoint();
+ jdbcResource.getJdbcTemplate().update(
+ writeSetInsertSql + (ConfVars.useMinHistoryLevel() ?
conflictSQLSuffix :
+ "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+ (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND
\"TC_OPERATION_TYPE\" <> :type")),
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("type", OperationType.COMPACT.getSqlConst())
+ .addValue("commitId", tempCommitId));
+
+ /**
+ * This S4U will mutex with other commitTxn() and openTxns().
+ * -1 below makes txn intervals look like [3,3] [4,4] if all txns are
serial
+ * Note: it's possible to have several txns have the same commit id.
Suppose 3 txns start
+ * at the same time and no new txns start until all 3 commit.
+ * We could've incremented the sequence for commitId as well but it
doesn't add anything functionally.
+ */
+ new AcquireTxnLockFunction(false).execute(jdbcResource);
+ commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+
+ if (!rqst.isExclWriteEnabled()) {
+ /**
+ * see if there are any overlapping txns that wrote the same element,
i.e. have a conflict
+ * Since entire commit operation is mutexed wrt other start/commit ops,
+ * committed.ws_commit_id <= current.ws_commit_id for all txns
+ * thus if committed.ws_commit_id < current.ws_txnid, transactions do
NOT overlap
+ * For example, [17,20] is committed, [6,80] is being committed right
now - these overlap
+ * [17,20] committed and [21,21] committing now - these do not overlap.
+ * [17,18] committed and [18,19] committing now - these overlap (here
18 started while 17 was still running)
+ */
+ WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
+ if (info != null) {
+ handleWriteConflict(jdbcResource, context,
undoWriteSetForCurrentTxn, info, txnid, commitId,
+ isReplayedReplTxn);
+ }
+ }
+ } else if (!ConfVars.useMinHistoryLevel()) {
+ jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("commitId", jdbcResource.execute(new
GetHighWaterMarkHandler())));
+ }
+
+ return new CommitInfo(tempCommitId, commitId, isUpdateOrDelete);
+ }
+
+ private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid)
+ throws MetaException {
+ String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
+ "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
+ "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
+ "\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\",
\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" " +
+ "FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
+ "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
+ //For partitioned table we always track writes at partition level
(never at table)
+ //and for non partitioned - always at table level, thus the same table
should never
+ //have entries with partition key and w/o
+ "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL))
" +
+ "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " +
//txns overlap; could replace ws_txnid
+ // with txnid, though any decent DB should infer this
+ "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has
rows we just inserted as
+ // part of this commitTxn() op
+ "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has
committed txns
+ //U+U and U+D and D+D is a conflict and we don't currently track I in
WRITE_SET at all
+ //it may seem like D+D should not be in conflict but consider 2
multi-stmt txns
+ //where each does "delete X + insert X, where X is a row with the same
PK. This is
+ //equivalent to an update of X but won't be in conflict unless D+D is
in conflict.
+ //The same happens when Hive splits U=I+D early so it looks like 2
branches of a
+ //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
+ // un-serializable to allow concurrent deletes
+ "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
+ "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
+ LOG.debug("Going to execute query: <{}>", writeConflictQuery);
+ return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid)
+ .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
+ .addValue("opDelete", OperationType.DELETE.getSqlConst()),
+ rs -> rs.next()
+ ? new WriteSetInfo(
+ rs.getLong("WS_TXNID"), rs.getLong("WS_COMMIT_ID"),
+ rs.getString("CUR_OP"), rs.getString("COMMITTED_OP"),
+ rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"),
rs.getString("WS_PARTITION")
+ )
+ : null);
+ }
+
+ private void handleWriteConflict(MultiDataSourceJdbcResource jdbcResource,
TransactionContext context,
+ Object undoWriteSetForCurrentTxn, WriteSetInfo info, long txnid, Long
commitId, boolean isReplayedReplTxn)
+ throws MetaException, TxnAbortedException {
+ //found a conflict, so let's abort the txn
+ String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId()) + "," +
info.commitId() + "]";
+ StringBuilder resource = new
StringBuilder(info.database()).append("/").append(info.table());
+ if (info.partition() != null) {
+ resource.append('/').append(info.partition());
+ }
+ String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," +
commitId + "]" + " due to a write conflict on " + resource +
+ " committed by " + committedTxn + " " + info.currOpType() + "/" +
info.opType();
+ //remove WRITE_SET info for current txn since it's about to abort
+ context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
+ LOG.info(msg);
+ //TODO: should make abortTxns() write something into TXNS.TXN_META_INFO
about this
+ int count = new AbortTxnsFunction(Collections.singletonList(txnid), false,
false,
+ isReplayedReplTxn,
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource);
+ if (count != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ throw new TxnAbortedException(msg);
+ }
+
+ private void handleCompletedTxnComponents(MultiDataSourceJdbcResource
jdbcResource, long txnid, TxnType txnType,
+ char isUpdateDelete, boolean isReplayedReplTxn, long sourceTxnId) throws
MetaException {
if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn &&
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
+
} else if (isReplayedReplTxn) {
if (rqst.isSetWriteEventInfos() && !rqst.getWriteEventInfos().isEmpty())
{
- jdbcResource.execute(new InsertCompletedTxnComponentsCommand(txnid,
isUpdateDelete, rqst.getWriteEventInfos()));
+ jdbcResource.execute(
+ new InsertCompletedTxnComponentsCommand(txnid, isUpdateDelete,
rqst.getWriteEventInfos()));
}
- jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId,
rqst.getReplPolicy()));
- }
- updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId,
tempCommitId);
-
- if (rqst.isSetKeyValue()) {
- updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
- }
-
- if (!isHiveReplTxn) {
- createCommitNotificationEvent(jdbcResource, txnid , txnType,
txnWriteDetails);
+ jdbcResource.execute(
+ new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
}
+ }
- LOG.debug("Going to commit");
+ private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource
jdbcResource, long txnid, char isUpdateDelete) {
+ // Move the record from txn_components into completed_txn_components so
that the compactor
+ // knows where to look to compact.
+ String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\",
\"CTC_DATABASE\", " +
+ "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\",
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
+ "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\",
:flag FROM \"TXN_COMPONENTS\" " +
+ "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
+ //we only track compactor activity in TXN_COMPONENTS to handle the case
where the
+ //compactor txn aborts - so don't bother copying it to
COMPLETED_TXN_COMPONENTS
+ LOG.debug("Going to execute insert <{}>", query);
+ int affectedRows = jdbcResource.getJdbcTemplate().update(query,
+ new MapSqlParameterSource()
+ .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
+ .addValue("txnid", txnid)
+ .addValue("type", OperationType.COMPACT.getSqlConst(),
Types.CHAR));
- if (MetastoreConf.getBoolVar(jdbcResource.getConf(),
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
+ if (affectedRows < 1) {
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
+ //also an IUD with DP that didn't match any rows.
+ LOG.info("Expected to move at least one record from txn_components to "
+ + "completed_txn_components when committing txn! {}",
JavaUtils.txnIdToString(txnid));
}
- return txnType;
}
- private void updateReplId(MultiDataSourceJdbcResource jdbcResource,
ReplLastIdInfo replLastIdInfo) throws MetaException {
+ private void updateReplId(MultiDataSourceJdbcResource jdbcResource,
ReplLastIdInfo replLastIdInfo)
+ throws MetaException {
String lastReplId = Long.toString(replLastIdInfo.getLastReplId());
String catalog = replLastIdInfo.isSetCatalog() ?
normalizeIdentifier(replLastIdInfo.getCatalog()) :
MetaStoreUtils.getDefaultCatalog(jdbcResource.getConf());
@@ -320,7 +432,7 @@ private void updateReplId(MultiDataSourceJdbcResource
jdbcResource, ReplLastIdIn
}
private long updateDatabaseProp(MultiDataSourceJdbcResource jdbcResource,
String catalog, String database,
- String prop, String propValue) throws
MetaException {
+ String prop, String propValue) throws MetaException {
String query =
"SELECT d.\"DB_ID\", dp.\"PARAM_KEY\", dp.\"PARAM_VALUE\" FROM
\"DATABASE_PARAMS\" dp\n" +
"RIGHT JOIN \"DBS\" d ON dp.\"DB_ID\" = d.\"DB_ID\" " +
@@ -334,7 +446,9 @@ private long updateDatabaseProp(MultiDataSourceJdbcResource
jdbcResource, String
.addValue("catalog", catalog),
//no row means database no found
rs -> rs.next()
- ? new DbEntityParam(rs.getLong("DB_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE"))
+ ? new DbEntityParam(
+ rs.getLong("DB_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ )
: null);
if (dbEntityParam == null) {
@@ -343,31 +457,31 @@ private long
updateDatabaseProp(MultiDataSourceJdbcResource jdbcResource, String
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"DATABASE_PARAMS\" VALUES (:dbId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"DATABASE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"DB_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, database);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for db: {} using command {}", prop, database,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("dbId", dbEntityParam.id)
+ .addValue("dbId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("DATABASE_PARAMS is corrupted for database: "
+ database);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private long updateTableProp(MultiDataSourceJdbcResource jdbcResource,
String catalog, String db, long dbId,
- String table, String prop, String propValue)
throws MetaException {
+ String table, String prop, String propValue) throws MetaException {
String query =
"SELECT t.\"TBL_ID\", tp.\"PARAM_KEY\", tp.\"PARAM_VALUE\" FROM
\"TABLE_PARAMS\" tp " +
"RIGHT JOIN \"TBLS\" t ON tp.\"TBL_ID\" = t.\"TBL_ID\" " +
@@ -381,7 +495,9 @@ private long updateTableProp(MultiDataSourceJdbcResource
jdbcResource, String ca
.addValue("dbId", dbId),
//no row means table no found
rs -> rs.next()
- ? new DbEntityParam(rs.getLong("TBL_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE"))
+ ? new DbEntityParam(
+ rs.getLong("TBL_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ )
: null);
if (dbEntityParam == null) {
@@ -390,56 +506,60 @@ private long updateTableProp(MultiDataSourceJdbcResource
jdbcResource, String ca
//TODO: would be better to replace with MERGE or UPSERT
String command;
- if (dbEntityParam.key == null) {
+ if (dbEntityParam.key() == null) {
command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
- } else if (!dbEntityParam.value.equals(propValue)) {
+ } else if (!dbEntityParam.value().equals(propValue)) {
command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
} else {
LOG.info("Database property: {} with value: {} already updated for db:
{}", prop, propValue, db);
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating {} for table: {} using command {}", prop, table,
command);
}
SqlParameterSource params = new MapSqlParameterSource()
- .addValue("tblId", dbEntityParam.id)
+ .addValue("tblId", dbEntityParam.id())
.addValue("key", prop)
.addValue("value", propValue);
if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
//only one row insert or update should happen
throw new RuntimeException("TABLE_PARAMS is corrupted for table: " +
table);
}
- return dbEntityParam.id;
+ return dbEntityParam.id();
}
private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource,
long tableId,
- List<String> partList, String prop, String
propValue) {
+ List<String> partList, String prop, String propValue) {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
//language=SQL
- prefix.append(
- "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM
\"PARTITION_PARAMS\" pp\n" +
- "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+ prefix.append("""
+ SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM
"PARTITION_PARAMS" pp
+ RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
// Populate the complete query with provided prefix and suffix
TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries,
prefix, suffix, partList,
"\"PART_NAME\"", true, false);
+
SqlParameterSource params = new MapSqlParameterSource()
.addValue("tblId", tableId)
.addValue("key", prop);
+
+ RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+ new DbEntityParam(
+ rs.getLong("PART_ID"), rs.getString("PARAM_KEY"),
rs.getString("PARAM_VALUE")
+ );
List<DbEntityParam> partitionParams = new ArrayList<>();
- for(String query : queries) {
+
+ for (String query : queries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + query + ">");
}
- jdbcResource.getJdbcTemplate().query(query, params,
- (ResultSet rs) -> {
- while (rs.next()) {
- partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"),
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
- }
- });
+ partitionParams.addAll(
+ jdbcResource.getJdbcTemplate().query(query, params, mapper)
+ );
}
//TODO: would be better to replace with MERGE or UPSERT
@@ -447,93 +567,41 @@ private void
updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, long
//all insert in one batch
int[][] inserts =
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
- partitionParams.stream().filter(p -> p.key ==
null).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() == null)
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
- ps.setLong(1, argument.id);
- ps.setString(2, argument.key);
+ ps.setLong(1, argument.id());
+ ps.setString(2, argument.key());
ps.setString(3, propValue);
});
//all update in one batch
int[][] updates
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
"UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\"
= ? AND \"PARAM_KEY\" = ?",
- partitionParams.stream().filter(p -> p.key != null &&
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+ partitionParams.stream()
+ .filter(p -> p.key() != null && !propValue.equals(p.value()))
+ .toList(),
+ maxBatchSize,
(ps, argument) -> {
ps.setString(1, propValue);
- ps.setLong(2, argument.id);
- ps.setString(3, argument.key);
+ ps.setLong(2, argument.id());
+ ps.setString(3, argument.key());
});
- if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() +
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+ int totalChanges =
+ Arrays.stream(inserts)
+ .flatMapToInt(IntStream::of)
+ .sum()
+ + Arrays.stream(updates)
+ .flatMapToInt(IntStream::of)
+ .sum();
+
+ if (totalChanges != partList.size()) {
throw new RuntimeException("PARTITION_PARAMS is corrupted, update
failed");
}
}
- private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource
jdbcResource, long txnid) throws MetaException {
- String writeConflictQuery =
jdbcResource.getSqlGenerator().addLimitClause(1,
- "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
- "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\",
\"COMMITTED\".\"WS_PARTITION\", " +
- "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\",
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
- "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
- "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
- //For partitioned table we always track writes at partition level
(never at table)
- //and for non partitioned - always at table level, thus the same table
should never
- //have entries with partition key and w/o
- "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL))
" +
- "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " +
//txns overlap; could replace ws_txnid
- // with txnid, though any decent DB should infer this
- "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has
rows we just inserted as
- // part of this commitTxn() op
- "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has
committed txns
- //U+U and U+D and D+D is a conflict and we don't currently track I in
WRITE_SET at all
- //it may seem like D+D should not be in conflict but consider 2
multi-stmt txns
- //where each does "delete X + insert X, where X is a row with the same
PK. This is
- //equivalent to an update of X but won't be in conflict unless D+D is
in conflict.
- //The same happens when Hive splits U=I+D early so it looks like 2
branches of a
- //multi-insert stmt (an Insert and a Delete branch). It also 'feels'
- // un-serializable to allow concurrent deletes
- "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
- "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
- LOG.debug("Going to execute query: <{}>", writeConflictQuery);
- return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
- new MapSqlParameterSource()
- .addValue("txnId", txnid)
- .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
- .addValue("opDelete", OperationType.DELETE.getSqlConst()),
- (ResultSet rs) -> {
- if(rs.next()) {
- return new WriteSetInfo(rs.getLong("WS_TXNID"),
rs.getLong("CUR_WS_COMMIT_ID"),
- rs.getLong("WS_COMMIT_ID"), rs.getString("CUR_OP"),
rs.getString("COMMITTED_OP"),
- rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"),
rs.getString("WS_PARTITION"));
- } else {
- return null;
- }
- });
- }
-
- private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource
jdbcResource, long txnid, char isUpdateDelete) {
- // Move the record from txn_components into completed_txn_components so
that the compactor
- // knows where to look to compact.
- String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\",
\"CTC_DATABASE\", " +
- "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\",
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
- "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\",
:flag FROM \"TXN_COMPONENTS\" " +
- "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
- //we only track compactor activity in TXN_COMPONENTS to handle the case
where the
- //compactor txn aborts - so don't bother copying it to
COMPLETED_TXN_COMPONENTS
- LOG.debug("Going to execute insert <{}>", query);
- int affectedRows = jdbcResource.getJdbcTemplate().update(query,
- new MapSqlParameterSource()
- .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
- .addValue("txnid", txnid)
- .addValue("type", OperationType.COMPACT.getSqlConst(),
Types.CHAR));
-
- if (affectedRows < 1) {
- //this can be reasonable for an empty txn START/COMMIT or read-only txn
- //also an IUD with DP that didn't match any rows.
- LOG.info("Expected to move at least one record from txn_components to "
- + "completed_txn_components when committing txn! {}",
JavaUtils.txnIdToString(txnid));
- }
- }
-
private void updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource
jdbcResource, CommitTxnRequest rqst) {
if (!rqst.getKeyValue().getKey().startsWith(TxnStore.TXN_KEY_START)) {
String errorMsg = "Error updating key/value in the sql backend with"
@@ -567,12 +635,13 @@ private void
updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes
}
private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource
jdbcResource,
- long txnid, TxnType txnType, Long commitId, long tempId) throws
MetaException {
+ long txnid, TxnType txnType, CommitInfo commitInfo) throws
MetaException {
+
List<String> queryBatch = new ArrayList<>(6);
// update write_set with real commitId
- if (commitId != null) {
- queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " + commitId
+
- " WHERE \"WS_COMMIT_ID\" = " + tempId + " AND \"WS_TXNID\" = " +
txnid);
+ if (!commitInfo.isEmpty()) {
+ queryBatch.add("UPDATE \"WRITE_SET\" SET \"WS_COMMIT_ID\" = " +
commitInfo.commitId() +
+ " WHERE \"WS_COMMIT_ID\" = " + commitInfo.tempId() + " AND
\"WS_TXNID\" = " + txnid);
}
// clean up txn related metadata
if (txnType != TxnType.READ_ONLY) {
@@ -581,23 +650,36 @@ private void
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
// DO NOT remove the transaction from the TXN table, the cleaner will
remove it when appropriate
queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " +
TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
+
if (txnType == TxnType.MATER_VIEW_REBUILD) {
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE
\"MRL_TXN_ID\" = " + txnid);
}
+
if (txnType == TxnType.SOFT_DELETE ||
MetaStoreServerUtils.isCompactionTxn(txnType)) {
- queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " +
commitId + ", \"CQ_COMMIT_TIME\" = " +
- getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE
\"CQ_TXN_ID\" = " + txnid);
+ // For soft-delete and compaction transactions, we need to set the
commit id
+ if (commitInfo.isEmpty() && !ConfVars.useMinHistoryWriteId()) {
+ // Take a global txn lock to synchronize the openTxn and commitTxn
operations
+ new AcquireTxnLockFunction(false).execute(jdbcResource);
+ }
+ queryBatch.add("""
+ UPDATE "COMPACTION_QUEUE" SET "CQ_NEXT_TXN_ID" = %s,
"CQ_COMMIT_TIME" = %s
+ WHERE "CQ_TXN_ID" = %d"""
+ .formatted(
+ defaultIfNull(commitInfo.commitId(), "(SELECT MAX(\"TXN_ID\")
FROM \"TXNS\")"),
+ getEpochFn(jdbcResource.getDatabaseProduct()),
+ txnid));
}
// execute all in one batch
-
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(queryBatch.toArray(new
String[0]));
+ jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
+ queryBatch.toArray(new String[0]));
List<Function<List<Long>, InClauseBatchCommand<Long>>> commands = List.of(
RemoveTxnsFromMinHistoryLevelCommand::new,
RemoveWriteIdsFromMinHistoryCommand::new
);
for (var cmd : commands) {
- jdbcResource.execute(cmd.apply(ImmutableList.of(txnid)));
+ jdbcResource.execute(cmd.apply(List.of(txnid)));
}
}
@@ -609,56 +691,45 @@ private void
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
* @param txnWriteDetails write details of the transaction
* @throws MetaException ex
*/
- private void createCommitNotificationEvent(MultiDataSourceJdbcResource
jdbcResource, long txnid, TxnType txnType, List<TxnWriteDetails>
txnWriteDetails)
- throws MetaException {
+ private void createCommitNotificationEvent(MultiDataSourceJdbcResource
jdbcResource, long txnid, TxnType txnType,
+ List<TxnWriteDetails> txnWriteDetails) throws MetaException {
if (transactionalListeners != null) {
- notifyCommitOrAbortEvent(txnid, EventMessage.EventType.COMMIT_TXN,
txnType, jdbcResource.getConnection(), txnWriteDetails, transactionalListeners);
+ notifyCommitOrAbortEvent(
+ txnid, EventMessage.EventType.COMMIT_TXN, txnType,
jdbcResource.getConnection(),
+ txnWriteDetails, transactionalListeners);
CompactionInfo compactionInfo = jdbcResource.execute(new
GetCompactionInfoHandler(txnid, true));
if (compactionInfo != null) {
- MetaStoreListenerNotifier
- .notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.COMMIT_COMPACTION,
- new CommitCompactionEvent(txnid, compactionInfo),
jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
+ MetaStoreListenerNotifier.notifyEventWithDirectSql(
+ transactionalListeners, EventMessage.EventType.COMMIT_COMPACTION,
new CommitCompactionEvent(txnid, compactionInfo),
+ jdbcResource.getConnection(), jdbcResource.getSqlGenerator());
} else {
LOG.warn("No compaction queue record found for Compaction type
transaction commit. txnId:" + txnid);
}
-
}
}
- private static class DbEntityParam {
- final long id;
- final String key;
- final String value;
-
- public DbEntityParam(long id, String key, String value) {
- this.id = id;
- this.key = key;
- this.value = value;
- }
+ private record DbEntityParam(long id, String key, String value) {
}
- private static class WriteSetInfo {
- final long txnId;
- final long currentCommitId;
- final long committedCommitId;
- final String currentOperationType;
- final String committedOperationType;
- final String database;
- final String table;
- final String partition;
-
- public WriteSetInfo(long txnId, long currentCommitId, long
committedCommitId,
- String currentOperationType, String
committedOperationType,
- String database, String table, String partition) {
- this.txnId = txnId;
- this.currentCommitId = currentCommitId;
- this.committedCommitId = committedCommitId;
- this.currentOperationType = currentOperationType;
- this.committedOperationType = committedOperationType;
- this.database = database;
- this.table = table;
- this.partition = partition;
+ private record WriteSetInfo(
+ long txnId, long commitId, String currOpType, String opType,
+ String database, String table, String partition) {
+ }
+
+ private record CommitInfo(
+ long tempId, Long commitId, boolean isUpdateOrDelete) {
+
+ private static CommitInfo empty() {
+ return new CommitInfo(0L, null, false);
+ }
+
+ public boolean isEmpty() {
+ return commitId == null;
+ }
+
+ public char opFlag() {
+ return isUpdateOrDelete ? 'Y' : 'N';
}
}