deniskuzZ commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841568394


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -267,36 +216,201 @@ public TxnType execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExce
        * If RO < W, then there is no reads-from relationship.
        * In replication flow we don't expect any write write conflict as it 
should have been handled at source.
        */
-      assert true;
+      return CommitInfo.empty();
+    }
+
+    String conflictSQLSuffix = String.format("""
+        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
+        """, OperationType.UPDATE, OperationType.DELETE);
+    String writeSetInsertSql = """
+        INSERT INTO "WRITE_SET"
+          ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID", 
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+        SELECT DISTINCT
+          "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+          :commitId,
+          "TC_OPERATION_TYPE"
+        """;
+
+    boolean isUpdateOrDelete = 
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
+        jdbcResource.getSqlGenerator()
+            .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid),
+        ResultSet::next));
+
+    Long commitId = null;
+    long tempCommitId = 0L;
+    
+    if (isUpdateOrDelete) {
+      tempCommitId = TxnUtils.generateTemporaryId();
+      //if here it means currently committing txn performed update/delete and 
we should check WW conflict
+      /**
+       * "select distinct" is used below because
+       * 1. once we get to multi-statement txns, we only care to record that 
something was updated once
+       * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried 
by caller it may create
+       *  duplicate entries in TXN_COMPONENTS
+       * but we want to add a PK on WRITE_SET which won't have unique rows w/o 
this distinct
+       * even if it includes all of its columns
+       *
+       * First insert into write_set using a temporary commitID, which will be 
updated in a separate call,
+       * see: {@link 
#updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource, long, TxnType, 
Long, long)}}.
+       * This should decrease the scope of the S4U lock on the next_txn_id 
table.
+       */
+      Object undoWriteSetForCurrentTxn = context.createSavepoint();
+      jdbcResource.getJdbcTemplate().update(
+          writeSetInsertSql + (ConfVars.useMinHistoryLevel() ? 
conflictSQLSuffix :
+          "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+              (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND 
\"TC_OPERATION_TYPE\" <> :type")),
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("type", OperationType.COMPACT.getSqlConst())
+              .addValue("commitId", tempCommitId));
+
+      /**
+       * This S4U will mutex with other commitTxn() and openTxns().
+       * -1 below makes txn intervals look like [3,3] [4,4] if all txns are 
serial
+       * Note: it's possible to have several txns have the same commit id.  
Suppose 3 txns start
+       * at the same time and no new txns start until all 3 commit.
+       * We could've incremented the sequence for commitId as well but it 
doesn't add anything functionally.
+       */
+      new AcquireTxnLockFunction(false).execute(jdbcResource);
+      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+
+      if (!rqst.isExclWriteEnabled()) {
+        /**
+         * see if there are any overlapping txns that wrote the same element, 
i.e. have a conflict
+         * Since entire commit operation is mutexed wrt other start/commit ops,
+         * committed.ws_commit_id <= current.ws_commit_id for all txns
+         * thus if committed.ws_commit_id < current.ws_txnid, transactions do 
NOT overlap
+         * For example, [17,20] is committed, [6,80] is being committed right 
now - these overlap
+         * [17,20] committed and [21,21] committing now - these do not overlap.
+         * [17,18] committed and [18,19] committing now - these overlap  (here 
18 started while 17 was still running)
+         */
+        WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
+        if (info != null) {
+          handleWriteConflict(jdbcResource, context, 
undoWriteSetForCurrentTxn, info, txnid, commitId,
+              isReplayedReplTxn);
+        }
+      }
+    } else if (!ConfVars.useMinHistoryLevel()) {
+      jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM 
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("commitId", jdbcResource.execute(new 
GetHighWaterMarkHandler())));
     }
 
+    return new CommitInfo(tempCommitId, commitId, isUpdateOrDelete);
+  }
+
+  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid)
+      throws MetaException {
+    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
+        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
+        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
+        "\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", 
\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" " +
+        "FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
+        "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
+        //For partitioned table we always track writes at partition level 
(never at table)
+        //and for non partitioned - always at table level, thus the same table 
should never
+        //have entries with partition key and w/o
+        "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
+        "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " + 
//txns overlap; could replace ws_txnid
+        // with txnid, though any decent DB should infer this
+        "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has 
rows we just inserted as
+        // part of this commitTxn() op
+        "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has 
committed txns
+        //U+U and U+D and D+D is a conflict and we don't currently track I in 
WRITE_SET at all
+        //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
+        //where each does "delete X + insert X, where X is a row with the same 
PK.  This is
+        //equivalent to an update of X but won't be in conflict unless D+D is 
in conflict.
+        //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
+        //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
+        // un-serializable to allow concurrent deletes
+        "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
+        "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
+    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
+    return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid)
+            .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
+            .addValue("opDelete", OperationType.DELETE.getSqlConst()),
+        rs -> rs.next()
+            ? new WriteSetInfo(
+                rs.getLong("WS_TXNID"), rs.getLong("WS_COMMIT_ID"),
+                rs.getString("CUR_OP"), rs.getString("COMMITTED_OP"),
+                rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"), 
rs.getString("WS_PARTITION")
+              )
+            : null);
+  }
+
+  private void handleWriteConflict(MultiDataSourceJdbcResource jdbcResource, 
TransactionContext context,
+      Object undoWriteSetForCurrentTxn, WriteSetInfo info, long txnid, Long 
commitId, boolean isReplayedReplTxn)
+      throws MetaException, TxnAbortedException {
+    //found a conflict, so let's abort the txn
+    String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId()) + "," + 
info.commitId() + "]";
+    StringBuilder resource = new 
StringBuilder(info.database()).append("/").append(info.table());
+    if (info.partition() != null) {
+      resource.append('/').append(info.partition());
+    }
+    String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + 
commitId + "]" + " due to a write conflict on " + resource +
+        " committed by " + committedTxn + " " + info.currOpType() + "/" + 
info.opType();
+    //remove WRITE_SET info for current txn since it's about to abort
+    context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
+    LOG.info(msg);
+    //TODO: should make abortTxns() write something into TXNS.TXN_META_INFO 
about this
+    int count = new AbortTxnsFunction(Collections.singletonList(txnid), false, 
false,
+        isReplayedReplTxn, 
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource);
+    if (count != 1) {
+      throw new IllegalStateException(msg + " FAILED!");
+    }
+    throw new TxnAbortedException(msg);
+  }
+
+  private void handleCompletedTxnComponents(MultiDataSourceJdbcResource 
jdbcResource, long txnid, TxnType txnType,
+      char isUpdateDelete, boolean isReplayedReplTxn, long sourceTxnId) throws 
MetaException {
     if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && 
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
       moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
+
     } else if (isReplayedReplTxn) {
       if (rqst.isSetWriteEventInfos() && !rqst.getWriteEventInfos().isEmpty()) 
{
-        jdbcResource.execute(new InsertCompletedTxnComponentsCommand(txnid, 
isUpdateDelete, rqst.getWriteEventInfos()));
+        jdbcResource.execute(
+            new InsertCompletedTxnComponentsCommand(txnid, isUpdateDelete, 
rqst.getWriteEventInfos()));
       }
-      jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId, 
rqst.getReplPolicy()));
+      jdbcResource.execute(
+          new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
     }
-    updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId, 
tempCommitId);
+  }
 
-    if (rqst.isSetKeyValue()) {
-      updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
-    }
+  private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource 
jdbcResource, long txnid, char isUpdateDelete) {
+    // Move the record from txn_components into completed_txn_components so 
that the compactor
+    // knows where to look to compact.
+    String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", 
\"CTC_DATABASE\", " +
+        "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", 
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
+        "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", 
:flag FROM \"TXN_COMPONENTS\" " +
+        "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
+    //we only track compactor activity in TXN_COMPONENTS to handle the case 
where the
+    //compactor txn aborts - so don't bother copying it to 
COMPLETED_TXN_COMPONENTS
+    LOG.debug("Going to execute insert <{}>", query);
+    int affectedRows = jdbcResource.getJdbcTemplate().update(query,
+        new MapSqlParameterSource()
+            .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
+            .addValue("txnid", txnid)
+            .addValue("type", OperationType.COMPACT.getSqlConst(), 
Types.CHAR));
 
-    if (!isHiveReplTxn) {
-      createCommitNotificationEvent(jdbcResource, txnid , txnType, 
txnWriteDetails);
+    if (affectedRows < 1) {
+      //this can be reasonable for an empty txn START/COMMIT or read-only txn
+      //also an IUD with DP that didn't match any rows.
+      LOG.info("Expected to move at least one record from txn_components to "
+          + "completed_txn_components when committing txn! {}", 
JavaUtils.txnIdToString(txnid));
     }
+  }
 
-    LOG.debug("Going to commit");
-
-    if (MetastoreConf.getBoolVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
-    }
-    return txnType;
+  private static char toUpdateDeleteFlag(boolean isUpdateOrDelete) {
+    return isUpdateOrDelete ? 'Y' : 'N';

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to