deniskuzZ commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841457965


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -94,169 +97,115 @@ public CommitTxnFunction(CommitTxnRequest rqst, 
List<TransactionalMetaStoreEvent
 
   @Override
   public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws 
MetaException, NoSuchTxnException, TxnAbortedException {
-    char isUpdateDelete = 'N';
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
 
-    boolean isReplayedReplTxn = 
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
-    boolean isHiveReplTxn = rqst.isSetReplPolicy() && 
TxnType.DEFAULT.equals(rqst.getTxn_type());
+    boolean isReplayedReplTxn = TxnType.REPL_CREATED == rqst.getTxn_type();
+    boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT == 
rqst.getTxn_type();
     //Find the write details for this transaction.
     //Doing it here before the metadata tables are updated below.
-    List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
-
-    if (!isHiveReplTxn) {
-      txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
+    List<TxnWriteDetails> txnWriteDetails = loadTxnWriteDetails(jdbcResource, 
isHiveReplTxn, txnid);
 
-    }
     // Get the current TXN
     TransactionContext context = 
jdbcResource.getTransactionManager().getActiveTransaction();
-    Long commitId = null;
 
     if (rqst.isSetReplLastIdInfo()) {
       updateReplId(jdbcResource, rqst.getReplLastIdInfo());
     }
 
     if (isReplayedReplTxn) {
       assert (rqst.isSetReplPolicy());
+      txnid = resolveTargetTxnId(jdbcResource, rqst.getTxnid());
       sourceTxnId = rqst.getTxnid();
-      List<Long> targetTxnIds = jdbcResource.execute(new 
TargetTxnIdListHandler(rqst.getReplPolicy(), 
Collections.singletonList(sourceTxnId)));
-      if (targetTxnIds.isEmpty()) {
-        // Idempotent case where txn was already closed or commit txn event 
received without
-        // corresponding open txn event.
-        LOG.info("Target txn id is missing for source txn id : {} and repl 
policy {}", sourceTxnId,
-            rqst.getReplPolicy());
-        throw new RollbackException(null);
-      }
-      assert targetTxnIds.size() == 1;
-      txnid = targetTxnIds.getFirst();
     }
 
-    /**
-     * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures 
that no other
-     * operation can change this txn (such acquiring locks). While lock() and 
commitTxn()
-     * should not normally run concurrently (for same txn) but could due to 
bugs in the client
-     * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
-     */
-    TxnType txnType = jdbcResource.execute(new 
GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), txnid));
+    TxnType txnType = getOpenTxnTypeAndLock(jdbcResource, txnid, 
isReplayedReplTxn);
     if (txnType == null) {
-      //if here, txn was not found (in expected state)
-      TxnStatus actualTxnStatus = jdbcResource.execute(new 
FindTxnStateHandler(txnid));
-      if (actualTxnStatus == TxnStatus.COMMITTED) {
-        if (isReplayedReplTxn) {
-          // in case of replication, idempotent is taken care by getTargetTxnId
-          LOG.warn("Invalid state COMMITTED for transactions started using 
replication replay task");
-        }
-        /**
-         * This makes the operation idempotent
-         * (assume that this is most likely due to retry logic)
-         */
-        LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
-        return null;
-      }
-      TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+      return null;
     }
 
-    String conflictSQLSuffix = String.format("""
-        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
-        """, OperationType.UPDATE, OperationType.DELETE);
-    long tempCommitId = TxnUtils.generateTemporaryId();
+    CommitInfo commitInfo = prepareWriteSetAndCheckConflicts(
+        jdbcResource, context, txnid, txnType, isReplayedReplTxn);
+    char isUpdateDelete = toUpdateDeleteFlag(commitInfo.isUpdateOrDelete());
 
-    if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
-      if (!ConfVars.useMinHistoryWriteId()) {
-        new AcquireTxnLockFunction(false).execute(jdbcResource);
-      }
-      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());

Review Comment:
   it was refactored and moved to updateWSCommitIdAndCleanUpMetadata



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to