zabetak commented on code in PR #6290:
URL: https://github.com/apache/hive/pull/6290#discussion_r2841170965


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long 
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, db);
-      return dbEntityParam.id;
+      return dbEntityParam.id();
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for table: {}  using command {}", prop, table, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("tblId", dbEntityParam.id)
+        .addValue("tblId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("TABLE_PARAMS is corrupted for table: " + 
table);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
   
   private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, 
long tableId,
-                                   List<String> partList, String prop, String 
propValue) {
+      List<String> partList, String prop, String propValue) {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
     //language=SQL
-    prefix.append(
-        "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM 
\"PARTITION_PARAMS\" pp\n" +
-        "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE 
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+    prefix.append("""
+        SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM 
"PARTITION_PARAMS" pp
+        RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE 
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
 
     // Populate the complete query with provided prefix and suffix
     TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries, 
prefix, suffix, partList,
         "\"PART_NAME\"", true, false);
+
     SqlParameterSource params = new MapSqlParameterSource()
         .addValue("tblId", tableId)
         .addValue("key", prop);
+
+    RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+        new DbEntityParam(
+            rs.getLong("PART_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+        );
     List<DbEntityParam> partitionParams = new ArrayList<>();
-    for(String query : queries) {
+
+    for (String query : queries) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + query + ">");
       }
-      jdbcResource.getJdbcTemplate().query(query, params,
-          (ResultSet rs) -> {
-            while (rs.next()) {
-              partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
-            }
-          });
+      partitionParams.addAll(
+          jdbcResource.getJdbcTemplate().query(query, params, mapper)
+      );
     }
 
     //TODO: would be better to replace with MERGE or UPSERT
     int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
     //all insert in one batch
     int[][] inserts = 
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
-        partitionParams.stream().filter(p -> p.key == 
null).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() == null)
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
-          ps.setLong(1, argument.id);
-          ps.setString(2, argument.key);
+          ps.setLong(1, argument.id());
+          ps.setString(2, argument.key());
           ps.setString(3, propValue);
         });
     //all update in one batch
     int[][] updates 
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\" 
= ? AND \"PARAM_KEY\" = ?",
-        partitionParams.stream().filter(p -> p.key != null && 
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() != null && !propValue.equals(p.value()))
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
           ps.setString(1, propValue);
-          ps.setLong(2, argument.id);
-          ps.setString(3, argument.key);
+          ps.setLong(2, argument.id());
+          ps.setString(3, argument.key());
         });
 
-    if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() + 
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+    int totalChanges =
+        Arrays.stream(inserts)
+            .flatMapToInt(IntStream::of)
+            .sum()
+      + Arrays.stream(updates)
+            .flatMapToInt(IntStream::of)
+            .sum();
+
+    if (totalChanges != partList.size()) {
       throw new RuntimeException("PARTITION_PARAMS is corrupted, update 
failed");      
     }    
   }
 
-  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid) throws MetaException {

Review Comment:
   Moving this method creates a big and unnecessary diff. Bringing them back to 
their original position will reduce significantly backport conflicts and make 
the real changes easier to follow.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -581,23 +652,36 @@ private void 
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
     queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
     // DO NOT remove the transaction from the TXN table, the cleaner will 
remove it when appropriate
     queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + 
TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
+
     if (txnType == TxnType.MATER_VIEW_REBUILD) {
       queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE 
\"MRL_TXN_ID\" = " + txnid);
     }
+
     if (txnType == TxnType.SOFT_DELETE || 
MetaStoreServerUtils.isCompactionTxn(txnType)) {
-      queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + 
commitId + ", \"CQ_COMMIT_TIME\" = " +
-          getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE 
\"CQ_TXN_ID\" = " + txnid);
+      // For soft-delete and compaction transactions, we need to set the 
commit id
+      if (commitInfo.isEmpty() && !ConfVars.useMinHistoryWriteId()) {
+        // Take a global txn lock to synchronize the openTxn and commitTxn 
operations
+        new AcquireTxnLockFunction(false).execute(jdbcResource);
+      }
+      queryBatch.add("""
+          UPDATE "COMPACTION_QUEUE" SET "CQ_NEXT_TXN_ID" = %s, 
"CQ_COMMIT_TIME" = %s
+          WHERE "CQ_TXN_ID" = %d"""
+          .formatted(
+              defaultIfNull(commitInfo.commitId(), "(SELECT MAX(\"TXN_ID\") 
FROM \"TXNS\")"),

Review Comment:
   This default value for setting `CQ_NEXT_TXN_ID` seems like new code. Was it 
there before? Why do we need it now?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long 
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, db);
-      return dbEntityParam.id;
+      return dbEntityParam.id();
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for table: {}  using command {}", prop, table, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("tblId", dbEntityParam.id)
+        .addValue("tblId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("TABLE_PARAMS is corrupted for table: " + 
table);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
   
   private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, 
long tableId,
-                                   List<String> partList, String prop, String 
propValue) {
+      List<String> partList, String prop, String propValue) {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
     //language=SQL
-    prefix.append(
-        "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM 
\"PARTITION_PARAMS\" pp\n" +
-        "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE 
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+    prefix.append("""
+        SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM 
"PARTITION_PARAMS" pp
+        RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE 
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
 
     // Populate the complete query with provided prefix and suffix
     TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries, 
prefix, suffix, partList,
         "\"PART_NAME\"", true, false);
+
     SqlParameterSource params = new MapSqlParameterSource()
         .addValue("tblId", tableId)
         .addValue("key", prop);
+
+    RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+        new DbEntityParam(
+            rs.getLong("PART_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+        );
     List<DbEntityParam> partitionParams = new ArrayList<>();
-    for(String query : queries) {
+
+    for (String query : queries) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + query + ">");
       }
-      jdbcResource.getJdbcTemplate().query(query, params,
-          (ResultSet rs) -> {
-            while (rs.next()) {
-              partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
-            }
-          });
+      partitionParams.addAll(
+          jdbcResource.getJdbcTemplate().query(query, params, mapper)
+      );
     }
 
     //TODO: would be better to replace with MERGE or UPSERT
     int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
     //all insert in one batch
     int[][] inserts = 
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
-        partitionParams.stream().filter(p -> p.key == 
null).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() == null)
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
-          ps.setLong(1, argument.id);
-          ps.setString(2, argument.key);
+          ps.setLong(1, argument.id());
+          ps.setString(2, argument.key());
           ps.setString(3, propValue);
         });
     //all update in one batch
     int[][] updates 
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\" 
= ? AND \"PARAM_KEY\" = ?",
-        partitionParams.stream().filter(p -> p.key != null && 
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() != null && !propValue.equals(p.value()))
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
           ps.setString(1, propValue);
-          ps.setLong(2, argument.id);
-          ps.setString(3, argument.key);
+          ps.setLong(2, argument.id());
+          ps.setString(3, argument.key());
         });
 
-    if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() + 
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+    int totalChanges =
+        Arrays.stream(inserts)
+            .flatMapToInt(IntStream::of)
+            .sum()
+      + Arrays.stream(updates)
+            .flatMapToInt(IntStream::of)
+            .sum();
+
+    if (totalChanges != partList.size()) {
       throw new RuntimeException("PARTITION_PARAMS is corrupted, update 
failed");      
     }    
   }
 
-  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid) throws MetaException {
-    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
-        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
-        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
-        "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", 
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +
-        "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM 
\"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
-        "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
-        //For partitioned table we always track writes at partition level 
(never at table)
-        //and for non partitioned - always at table level, thus the same table 
should never
-        //have entries with partition key and w/o
-        "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
-        "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " + 
//txns overlap; could replace ws_txnid
-        // with txnid, though any decent DB should infer this
-        "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has 
rows we just inserted as
-        // part of this commitTxn() op
-        "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has 
committed txns
-        //U+U and U+D and D+D is a conflict and we don't currently track I in 
WRITE_SET at all
-        //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
-        //where each does "delete X + insert X, where X is a row with the same 
PK.  This is
-        //equivalent to an update of X but won't be in conflict unless D+D is 
in conflict.
-        //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
-        //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
-        // un-serializable to allow concurrent deletes
-        "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
-        "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
-    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
-    return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
-        new MapSqlParameterSource()
-            .addValue("txnId", txnid)
-            .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
-            .addValue("opDelete", OperationType.DELETE.getSqlConst()),
-        (ResultSet rs) -> {
-          if(rs.next()) {
-            return new WriteSetInfo(rs.getLong("WS_TXNID"), 
rs.getLong("CUR_WS_COMMIT_ID"),
-                rs.getLong("WS_COMMIT_ID"), rs.getString("CUR_OP"), 
rs.getString("COMMITTED_OP"),
-                rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"), 
rs.getString("WS_PARTITION"));
-          } else {
-            return null;
-          }
-        });
-  }
-
-  private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource 
jdbcResource, long txnid, char isUpdateDelete) {

Review Comment:
   Please restore the method to its original position for the same reasons 
mentioned above.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -267,36 +216,201 @@ public TxnType execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExce
        * If RO < W, then there is no reads-from relationship.
        * In replication flow we don't expect any write write conflict as it 
should have been handled at source.
        */
-      assert true;
+      return CommitInfo.empty();
+    }
+
+    String conflictSQLSuffix = String.format("""
+        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
+        """, OperationType.UPDATE, OperationType.DELETE);
+    String writeSetInsertSql = """
+        INSERT INTO "WRITE_SET"
+          ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID", 
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+        SELECT DISTINCT
+          "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+          :commitId,
+          "TC_OPERATION_TYPE"
+        """;
+
+    boolean isUpdateOrDelete = 
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
+        jdbcResource.getSqlGenerator()
+            .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid),
+        ResultSet::next));
+
+    Long commitId = null;
+    long tempCommitId = 0L;
+    
+    if (isUpdateOrDelete) {
+      tempCommitId = TxnUtils.generateTemporaryId();
+      //if here it means currently committing txn performed update/delete and 
we should check WW conflict
+      /**
+       * "select distinct" is used below because
+       * 1. once we get to multi-statement txns, we only care to record that 
something was updated once
+       * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried 
by caller it may create
+       *  duplicate entries in TXN_COMPONENTS
+       * but we want to add a PK on WRITE_SET which won't have unique rows w/o 
this distinct
+       * even if it includes all of its columns
+       *
+       * First insert into write_set using a temporary commitID, which will be 
updated in a separate call,
+       * see: {@link 
#updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource, long, TxnType, 
Long, long)}}.
+       * This should decrease the scope of the S4U lock on the next_txn_id 
table.
+       */
+      Object undoWriteSetForCurrentTxn = context.createSavepoint();
+      jdbcResource.getJdbcTemplate().update(
+          writeSetInsertSql + (ConfVars.useMinHistoryLevel() ? 
conflictSQLSuffix :
+          "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+              (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND 
\"TC_OPERATION_TYPE\" <> :type")),
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("type", OperationType.COMPACT.getSqlConst())
+              .addValue("commitId", tempCommitId));
+
+      /**
+       * This S4U will mutex with other commitTxn() and openTxns().
+       * -1 below makes txn intervals look like [3,3] [4,4] if all txns are 
serial
+       * Note: it's possible to have several txns have the same commit id.  
Suppose 3 txns start
+       * at the same time and no new txns start until all 3 commit.
+       * We could've incremented the sequence for commitId as well but it 
doesn't add anything functionally.
+       */
+      new AcquireTxnLockFunction(false).execute(jdbcResource);
+      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+
+      if (!rqst.isExclWriteEnabled()) {
+        /**
+         * see if there are any overlapping txns that wrote the same element, 
i.e. have a conflict
+         * Since entire commit operation is mutexed wrt other start/commit ops,
+         * committed.ws_commit_id <= current.ws_commit_id for all txns
+         * thus if committed.ws_commit_id < current.ws_txnid, transactions do 
NOT overlap
+         * For example, [17,20] is committed, [6,80] is being committed right 
now - these overlap
+         * [17,20] committed and [21,21] committing now - these do not overlap.
+         * [17,18] committed and [18,19] committing now - these overlap  (here 
18 started while 17 was still running)
+         */
+        WriteSetInfo info = checkForWriteConflict(jdbcResource, txnid);
+        if (info != null) {
+          handleWriteConflict(jdbcResource, context, 
undoWriteSetForCurrentTxn, info, txnid, commitId,
+              isReplayedReplTxn);
+        }
+      }
+    } else if (!ConfVars.useMinHistoryLevel()) {
+      jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM 
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
+          new MapSqlParameterSource()
+              .addValue("txnId", txnid)
+              .addValue("commitId", jdbcResource.execute(new 
GetHighWaterMarkHandler())));
     }
 
+    return new CommitInfo(tempCommitId, commitId, isUpdateOrDelete);
+  }
+
+  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid)
+      throws MetaException {
+    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
+        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
+        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
+        "\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", 
\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" " +
+        "FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " +
+        "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND 
\"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " +
+        //For partitioned table we always track writes at partition level 
(never at table)
+        //and for non partitioned - always at table level, thus the same table 
should never
+        //have entries with partition key and w/o
+        "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR 
(\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) 
" +
+        "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\" " + 
//txns overlap; could replace ws_txnid
+        // with txnid, though any decent DB should infer this
+        "AND \"CUR\".\"WS_TXNID\"= :txnId " + //make sure RHS of join only has 
rows we just inserted as
+        // part of this commitTxn() op
+        "AND \"COMMITTED\".\"WS_TXNID\" <> :txnId " + //and LHS only has 
committed txns
+        //U+U and U+D and D+D is a conflict and we don't currently track I in 
WRITE_SET at all
+        //it may seem like D+D should not be in conflict but consider 2 
multi-stmt txns
+        //where each does "delete X + insert X, where X is a row with the same 
PK.  This is
+        //equivalent to an update of X but won't be in conflict unless D+D is 
in conflict.
+        //The same happens when Hive splits U=I+D early so it looks like 2 
branches of a
+        //multi-insert stmt (an Insert and a Delete branch).  It also 'feels'
+        // un-serializable to allow concurrent deletes
+        "AND (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete) " +
+        "AND \"CUR\".\"WS_OPERATION_TYPE\" IN(:opUpdate, :opDelete))");
+    LOG.debug("Going to execute query: <{}>", writeConflictQuery);
+    return jdbcResource.getJdbcTemplate().query(writeConflictQuery,
+        new MapSqlParameterSource()
+            .addValue("txnId", txnid)
+            .addValue("opUpdate", OperationType.UPDATE.getSqlConst())
+            .addValue("opDelete", OperationType.DELETE.getSqlConst()),
+        rs -> rs.next()
+            ? new WriteSetInfo(
+                rs.getLong("WS_TXNID"), rs.getLong("WS_COMMIT_ID"),
+                rs.getString("CUR_OP"), rs.getString("COMMITTED_OP"),
+                rs.getString("WS_DATABASE"), rs.getString("WS_TABLE"), 
rs.getString("WS_PARTITION")
+              )
+            : null);
+  }
+
+  private void handleWriteConflict(MultiDataSourceJdbcResource jdbcResource, 
TransactionContext context,
+      Object undoWriteSetForCurrentTxn, WriteSetInfo info, long txnid, Long 
commitId, boolean isReplayedReplTxn)
+      throws MetaException, TxnAbortedException {
+    //found a conflict, so let's abort the txn
+    String committedTxn = "[" + JavaUtils.txnIdToString(info.txnId()) + "," + 
info.commitId() + "]";
+    StringBuilder resource = new 
StringBuilder(info.database()).append("/").append(info.table());
+    if (info.partition() != null) {
+      resource.append('/').append(info.partition());
+    }
+    String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + 
commitId + "]" + " due to a write conflict on " + resource +
+        " committed by " + committedTxn + " " + info.currOpType() + "/" + 
info.opType();
+    //remove WRITE_SET info for current txn since it's about to abort
+    context.rollbackToSavepoint(undoWriteSetForCurrentTxn);
+    LOG.info(msg);
+    //TODO: should make abortTxns() write something into TXNS.TXN_META_INFO 
about this
+    int count = new AbortTxnsFunction(Collections.singletonList(txnid), false, 
false,
+        isReplayedReplTxn, 
TxnErrorMsg.ABORT_WRITE_CONFLICT).execute(jdbcResource);
+    if (count != 1) {
+      throw new IllegalStateException(msg + " FAILED!");
+    }
+    throw new TxnAbortedException(msg);
+  }
+
+  private void handleCompletedTxnComponents(MultiDataSourceJdbcResource 
jdbcResource, long txnid, TxnType txnType,
+      char isUpdateDelete, boolean isReplayedReplTxn, long sourceTxnId) throws 
MetaException {
     if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && 
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
       moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
+
     } else if (isReplayedReplTxn) {
       if (rqst.isSetWriteEventInfos() && !rqst.getWriteEventInfos().isEmpty()) 
{
-        jdbcResource.execute(new InsertCompletedTxnComponentsCommand(txnid, 
isUpdateDelete, rqst.getWriteEventInfos()));
+        jdbcResource.execute(
+            new InsertCompletedTxnComponentsCommand(txnid, isUpdateDelete, 
rqst.getWriteEventInfos()));
       }
-      jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId, 
rqst.getReplPolicy()));
+      jdbcResource.execute(
+          new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
     }
-    updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId, 
tempCommitId);
+  }
 
-    if (rqst.isSetKeyValue()) {
-      updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
-    }
+  private void moveTxnComponentsToCompleted(MultiDataSourceJdbcResource 
jdbcResource, long txnid, char isUpdateDelete) {
+    // Move the record from txn_components into completed_txn_components so 
that the compactor
+    // knows where to look to compact.
+    String query = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", 
\"CTC_DATABASE\", " +
+        "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", 
\"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\", " +
+        "\"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", 
:flag FROM \"TXN_COMPONENTS\" " +
+        "WHERE \"TC_TXNID\" = :txnid AND \"TC_OPERATION_TYPE\" <> :type";
+    //we only track compactor activity in TXN_COMPONENTS to handle the case 
where the
+    //compactor txn aborts - so don't bother copying it to 
COMPLETED_TXN_COMPONENTS
+    LOG.debug("Going to execute insert <{}>", query);
+    int affectedRows = jdbcResource.getJdbcTemplate().update(query,
+        new MapSqlParameterSource()
+            .addValue("flag", Character.toString(isUpdateDelete), Types.CHAR)
+            .addValue("txnid", txnid)
+            .addValue("type", OperationType.COMPACT.getSqlConst(), 
Types.CHAR));
 
-    if (!isHiveReplTxn) {
-      createCommitNotificationEvent(jdbcResource, txnid , txnType, 
txnWriteDetails);
+    if (affectedRows < 1) {
+      //this can be reasonable for an empty txn START/COMMIT or read-only txn
+      //also an IUD with DP that didn't match any rows.
+      LOG.info("Expected to move at least one record from txn_components to "
+          + "completed_txn_components when committing txn! {}", 
JavaUtils.txnIdToString(txnid));
     }
+  }
 
-    LOG.debug("Going to commit");
-
-    if (MetastoreConf.getBoolVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_COMMITTED_TXNS).inc();
-    }
-    return txnType;
+  private static char toUpdateDeleteFlag(boolean isUpdateOrDelete) {
+    return isUpdateOrDelete ? 'Y' : 'N';

Review Comment:
   How about putting this logic inside `CommitInfo` class:
   
   ```java
   public char opFlag() {
     return isUpdateOrDelete ? 'Y' : 'N';
   }
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long 
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, db);
-      return dbEntityParam.id;
+      return dbEntityParam.id();
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for table: {}  using command {}", prop, table, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("tblId", dbEntityParam.id)
+        .addValue("tblId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("TABLE_PARAMS is corrupted for table: " + 
table);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
   
   private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, 
long tableId,
-                                   List<String> partList, String prop, String 
propValue) {
+      List<String> partList, String prop, String propValue) {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
     //language=SQL
-    prefix.append(
-        "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM 
\"PARTITION_PARAMS\" pp\n" +
-        "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE 
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+    prefix.append("""
+        SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM 
"PARTITION_PARAMS" pp
+        RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE 
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
 
     // Populate the complete query with provided prefix and suffix
     TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries, 
prefix, suffix, partList,
         "\"PART_NAME\"", true, false);
+
     SqlParameterSource params = new MapSqlParameterSource()
         .addValue("tblId", tableId)
         .addValue("key", prop);
+
+    RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+        new DbEntityParam(
+            rs.getLong("PART_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+        );
     List<DbEntityParam> partitionParams = new ArrayList<>();
-    for(String query : queries) {
+
+    for (String query : queries) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + query + ">");
       }
-      jdbcResource.getJdbcTemplate().query(query, params,
-          (ResultSet rs) -> {
-            while (rs.next()) {
-              partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
-            }
-          });
+      partitionParams.addAll(
+          jdbcResource.getJdbcTemplate().query(query, params, mapper)
+      );
     }
 
     //TODO: would be better to replace with MERGE or UPSERT
     int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
     //all insert in one batch
     int[][] inserts = 
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
-        partitionParams.stream().filter(p -> p.key == 
null).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() == null)
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
-          ps.setLong(1, argument.id);
-          ps.setString(2, argument.key);
+          ps.setLong(1, argument.id());
+          ps.setString(2, argument.key());
           ps.setString(3, propValue);
         });
     //all update in one batch
     int[][] updates 
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\" 
= ? AND \"PARAM_KEY\" = ?",
-        partitionParams.stream().filter(p -> p.key != null && 
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() != null && !propValue.equals(p.value()))
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
           ps.setString(1, propValue);
-          ps.setLong(2, argument.id);
-          ps.setString(3, argument.key);
+          ps.setLong(2, argument.id());
+          ps.setString(3, argument.key());
         });
 
-    if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() + 
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+    int totalChanges =
+        Arrays.stream(inserts)
+            .flatMapToInt(IntStream::of)
+            .sum()
+      + Arrays.stream(updates)
+            .flatMapToInt(IntStream::of)
+            .sum();
+
+    if (totalChanges != partList.size()) {
       throw new RuntimeException("PARTITION_PARAMS is corrupted, update 
failed");      
     }    
   }
 
-  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid) throws MetaException {
-    String writeConflictQuery = 
jdbcResource.getSqlGenerator().addLimitClause(1, 
-        "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " +
-        "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", 
\"COMMITTED\".\"WS_PARTITION\", " +
-        "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", 
\"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " +

Review Comment:
   Why do we remove `CUR_WS_COMMIT_ID`?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -94,169 +97,115 @@ public CommitTxnFunction(CommitTxnRequest rqst, 
List<TransactionalMetaStoreEvent
 
   @Override
   public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws 
MetaException, NoSuchTxnException, TxnAbortedException {
-    char isUpdateDelete = 'N';
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
 
-    boolean isReplayedReplTxn = 
TxnType.REPL_CREATED.equals(rqst.getTxn_type());
-    boolean isHiveReplTxn = rqst.isSetReplPolicy() && 
TxnType.DEFAULT.equals(rqst.getTxn_type());
+    boolean isReplayedReplTxn = TxnType.REPL_CREATED == rqst.getTxn_type();
+    boolean isHiveReplTxn = rqst.isSetReplPolicy() && TxnType.DEFAULT == 
rqst.getTxn_type();
     //Find the write details for this transaction.
     //Doing it here before the metadata tables are updated below.
-    List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
-
-    if (!isHiveReplTxn) {
-      txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
+    List<TxnWriteDetails> txnWriteDetails = loadTxnWriteDetails(jdbcResource, 
isHiveReplTxn, txnid);
 
-    }
     // Get the current TXN
     TransactionContext context = 
jdbcResource.getTransactionManager().getActiveTransaction();
-    Long commitId = null;
 
     if (rqst.isSetReplLastIdInfo()) {
       updateReplId(jdbcResource, rqst.getReplLastIdInfo());
     }
 
     if (isReplayedReplTxn) {
       assert (rqst.isSetReplPolicy());
+      txnid = resolveTargetTxnId(jdbcResource, rqst.getTxnid());
       sourceTxnId = rqst.getTxnid();
-      List<Long> targetTxnIds = jdbcResource.execute(new 
TargetTxnIdListHandler(rqst.getReplPolicy(), 
Collections.singletonList(sourceTxnId)));
-      if (targetTxnIds.isEmpty()) {
-        // Idempotent case where txn was already closed or commit txn event 
received without
-        // corresponding open txn event.
-        LOG.info("Target txn id is missing for source txn id : {} and repl 
policy {}", sourceTxnId,
-            rqst.getReplPolicy());
-        throw new RollbackException(null);
-      }
-      assert targetTxnIds.size() == 1;
-      txnid = targetTxnIds.getFirst();
     }
 
-    /**
-     * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures 
that no other
-     * operation can change this txn (such acquiring locks). While lock() and 
commitTxn()
-     * should not normally run concurrently (for same txn) but could due to 
bugs in the client
-     * which could then corrupt internal transaction manager state.  Also 
competes with abortTxn().
-     */
-    TxnType txnType = jdbcResource.execute(new 
GetOpenTxnTypeAndLockHandler(jdbcResource.getSqlGenerator(), txnid));
+    TxnType txnType = getOpenTxnTypeAndLock(jdbcResource, txnid, 
isReplayedReplTxn);
     if (txnType == null) {
-      //if here, txn was not found (in expected state)
-      TxnStatus actualTxnStatus = jdbcResource.execute(new 
FindTxnStateHandler(txnid));
-      if (actualTxnStatus == TxnStatus.COMMITTED) {
-        if (isReplayedReplTxn) {
-          // in case of replication, idempotent is taken care by getTargetTxnId
-          LOG.warn("Invalid state COMMITTED for transactions started using 
replication replay task");
-        }
-        /**
-         * This makes the operation idempotent
-         * (assume that this is most likely due to retry logic)
-         */
-        LOG.info("Nth commitTxn({}) msg", JavaUtils.txnIdToString(txnid));
-        return null;
-      }
-      TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
+      return null;
     }
 
-    String conflictSQLSuffix = String.format("""
-        FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND 
"TC_OPERATION_TYPE" IN (%s, %s)
-        """, OperationType.UPDATE, OperationType.DELETE);
-    long tempCommitId = TxnUtils.generateTemporaryId();
+    CommitInfo commitInfo = prepareWriteSetAndCheckConflicts(
+        jdbcResource, context, txnid, txnType, isReplayedReplTxn);
+    char isUpdateDelete = toUpdateDeleteFlag(commitInfo.isUpdateOrDelete());
 
-    if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
-      if (!ConfVars.useMinHistoryWriteId()) {
-        new AcquireTxnLockFunction(false).execute(jdbcResource);
-      }
-      commitId = jdbcResource.execute(new GetHighWaterMarkHandler());

Review Comment:
   Was this lock and watermark logic moved somewhere or it was removed 
completely?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java:
##########
@@ -390,150 +508,102 @@ private long 
updateTableProp(MultiDataSourceJdbcResource jdbcResource, String ca
 
     //TODO: would be better to replace with MERGE or UPSERT
     String command;
-    if (dbEntityParam.key == null) {
+    if (dbEntityParam.key() == null) {
       command = "INSERT INTO \"TABLE_PARAMS\" VALUES (:tblId, :key, :value)";
-    } else if (!dbEntityParam.value.equals(propValue)) {
+    } else if (!dbEntityParam.value().equals(propValue)) {
       command = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = :value WHERE 
\"TBL_ID\" = :dbId AND \"PARAM_KEY\" = :key";
     } else {
       LOG.info("Database property: {} with value: {} already updated for db: 
{}", prop, propValue, db);
-      return dbEntityParam.id;
+      return dbEntityParam.id();
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating {} for table: {}  using command {}", prop, table, 
command);
     }
     SqlParameterSource params = new MapSqlParameterSource()
-        .addValue("tblId", dbEntityParam.id)
+        .addValue("tblId", dbEntityParam.id())
         .addValue("key", prop)
         .addValue("value", propValue);
     if (jdbcResource.getJdbcTemplate().update(command, params) != 1) {
       //only one row insert or update should happen
       throw new RuntimeException("TABLE_PARAMS is corrupted for table: " + 
table);
     }
-    return dbEntityParam.id;
+    return dbEntityParam.id();
   }
   
   private void updatePartitionProp(MultiDataSourceJdbcResource jdbcResource, 
long tableId,
-                                   List<String> partList, String prop, String 
propValue) {
+      List<String> partList, String prop, String propValue) {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
     //language=SQL
-    prefix.append(
-        "SELECT p.\"PART_ID\", pp.\"PARAM_KEY\", pp.\"PARAM_VALUE\" FROM 
\"PARTITION_PARAMS\" pp\n" +
-        "RIGHT JOIN \"PARTITIONS\" p ON pp.\"PART_ID\" = p.\"PART_ID\" WHERE 
p.\"TBL_ID\" = :tblId AND pp.\"PARAM_KEY\" = :key");
+    prefix.append("""
+        SELECT p."PART_ID", pp."PARAM_KEY", pp."PARAM_VALUE" FROM 
"PARTITION_PARAMS" pp
+        RIGHT JOIN "PARTITIONS" p ON pp."PART_ID" = p."PART_ID" WHERE 
p."TBL_ID" = :tblId AND pp."PARAM_KEY" = :key""");
 
     // Populate the complete query with provided prefix and suffix
     TxnUtils.buildQueryWithINClauseStrings(jdbcResource.getConf(), queries, 
prefix, suffix, partList,
         "\"PART_NAME\"", true, false);
+
     SqlParameterSource params = new MapSqlParameterSource()
         .addValue("tblId", tableId)
         .addValue("key", prop);
+
+    RowMapper<DbEntityParam> mapper = (rs, rowNum) ->
+        new DbEntityParam(
+            rs.getLong("PART_ID"), rs.getString("PARAM_KEY"), 
rs.getString("PARAM_VALUE")
+        );
     List<DbEntityParam> partitionParams = new ArrayList<>();
-    for(String query : queries) {
+
+    for (String query : queries) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + query + ">");
       }
-      jdbcResource.getJdbcTemplate().query(query, params,
-          (ResultSet rs) -> {
-            while (rs.next()) {
-              partitionParams.add(new DbEntityParam(rs.getLong("PART_ID"), 
rs.getString("PARAM_KEY"), rs.getString("PARAM_VALUE")));
-            }
-          });
+      partitionParams.addAll(
+          jdbcResource.getJdbcTemplate().query(query, params, mapper)
+      );
     }
 
     //TODO: would be better to replace with MERGE or UPSERT
     int maxBatchSize = MetastoreConf.getIntVar(jdbcResource.getConf(), 
MetastoreConf.ConfVars.JDBC_MAX_BATCH_SIZE);
     //all insert in one batch
     int[][] inserts = 
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "INSERT INTO \"PARTITION_PARAMS\" VALUES (?, ?, ?)",
-        partitionParams.stream().filter(p -> p.key == 
null).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() == null)
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
-          ps.setLong(1, argument.id);
-          ps.setString(2, argument.key);
+          ps.setLong(1, argument.id());
+          ps.setString(2, argument.key());
           ps.setString(3, propValue);
         });
     //all update in one batch
     int[][] updates 
=jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(
         "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"PART_ID\" 
= ? AND \"PARAM_KEY\" = ?",
-        partitionParams.stream().filter(p -> p.key != null && 
!propValue.equals(p.value)).collect(Collectors.toList()), maxBatchSize,
+        partitionParams.stream()
+            .filter(p -> p.key() != null && !propValue.equals(p.value()))
+            .toList(),
+        maxBatchSize,
         (ps, argument) -> {
           ps.setString(1, propValue);
-          ps.setLong(2, argument.id);
-          ps.setString(3, argument.key);
+          ps.setLong(2, argument.id());
+          ps.setString(3, argument.key());
         });
 
-    if (Arrays.stream(inserts).flatMapToInt(IntStream::of).sum() + 
Arrays.stream(updates).flatMapToInt(IntStream::of).sum() != partList.size()) {
+    int totalChanges =
+        Arrays.stream(inserts)
+            .flatMapToInt(IntStream::of)
+            .sum()
+      + Arrays.stream(updates)
+            .flatMapToInt(IntStream::of)
+            .sum();
+
+    if (totalChanges != partList.size()) {
       throw new RuntimeException("PARTITION_PARAMS is corrupted, update 
failed");      
     }    
   }
 
-  private WriteSetInfo checkForWriteConflict(MultiDataSourceJdbcResource 
jdbcResource, long txnid) throws MetaException {

Review Comment:
   In addition, since the SQL query is mostly intact avoid adding/removing 
insignificant spaces/tabs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to