This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 906f61cb997 HIVE-29077: Reduce HMS calls while adding entries into the 
transactionListeners while abortTxn (#5946)
906f61cb997 is described below

commit 906f61cb9972ac642fbf4ac8e01ab6be29f2df11
Author: Harshal Patel <109334642+harshal...@users.noreply.github.com>
AuthorDate: Sat Jul 12 18:07:10 2025 +0530

    HIVE-29077: Reduce HMS calls while adding entries into the 
transactionListeners while abortTxn (#5946)
---
 .../TestTransactionalDbNotificationListener.java   | 34 ++++++++++++
 .../ql/parse/repl/dump/events/AbortTxnHandler.java |  8 +--
 .../parse/repl/dump/events/CommitTxnHandler.java   | 14 +++--
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 57 +++++++-------------
 .../txn/jdbc/functions/CommitTxnFunction.java      |  8 +--
 .../jdbc/functions/PerformTimeoutsFunction.java    | 13 ++---
 .../txn/jdbc/queries/GetTxnDbsUpdatedHandler.java  | 62 ----------------------
 ...ava => GetWriteIdsMappingForTxnIdsHandler.java} | 21 ++++----
 8 files changed, 80 insertions(+), 137 deletions(-)

diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
index 7d8872ba986..0552d577a37 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
@@ -21,7 +21,12 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConfForTest;
@@ -161,4 +166,33 @@ public void commitTxn() throws Exception {
         assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
     }
 
+    @Test
+    public void commitTxnWithAllocateWriteID() throws Exception {
+        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        msClient.commitTxn(txnId1);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(0, rsp.getEventsSize());
+
+        msClient.allocateTableWriteId(txnId2, "test", "t1");
+        msClient.commitTxn(txnId2);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        NotificationEvent commitEvent = rsp.getEvents().get(1);
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        Map<String, Object> commitMessage = 
objectMapper.readValue(commitEvent.getMessage(), Map.class);
+        assertEquals(List.of(1), commitMessage.get("writeIds"));
+        assertEquals(List.of("test"), commitMessage.get("databases"));
+
+        assertEquals(2, rsp.getEventsSize()); // alloc_write_id and commit_txn 
events
+
+        assertEquals(firstEventId + 3, commitEvent.getEventId());
+        assertTrue(commitEvent.getEventTime() >= startTime);
+        assertEquals(EventType.COMMIT_TXN.toString(), 
commitEvent.getEventType());
+    }
+
 }
\ No newline at end of file
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
index 5847d164fbb..67dd4b5dfc3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
@@ -17,20 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
-import com.google.common.collect.Collections2;
-import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
 
@@ -55,7 +49,7 @@ public void handle(Context withinContext) throws Exception {
       List<String> dbsUpdated = eventMessage.getDbsUpdated()
                                               .stream()
                                               
.map(StringUtils::normalizeIdentifier)
-                                              .collect(Collectors.toList());
+                                              .toList();
       if ((writeIds == null || writeIds.isEmpty() || 
!dbsUpdated.contains(contextDbName))) {
         LOG.info("Filter out #{} ABORT_TXN message : {}", fromEventId(), 
eventMessageAsJSON);
         return;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index ed554245596..0d3a3d4c901 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -23,11 +23,9 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
 import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -62,7 +60,7 @@ CommitTxnMessage eventMessage(String stringRepresentation) {
 
   private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> 
files, Context withinContext,
                               Path dataPath)
-          throws IOException, LoginException, MetaException, 
HiveFatalException, SemanticException {
+          throws IOException, LoginException, HiveFatalException, 
SemanticException {
     boolean copyAtLoad = 
withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
     if (copyAtLoad) {
       // encoded filename/checksum of files, write into _files
@@ -76,7 +74,7 @@ private void writeDumpFiles(Table qlMdTable, Partition ptn, 
Iterable<String> fil
 
   private void createDumpFile(Context withinContext, 
org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
                   List<Partition> qlPtns, List<List<String>> fileListArray)
-          throws IOException, SemanticException, LoginException, 
MetaException, HiveFatalException {
+          throws IOException, SemanticException, LoginException, 
HiveFatalException {
     if (fileListArray == null || fileListArray.isEmpty()) {
       return;
     }
@@ -92,7 +90,7 @@ private void createDumpFile(Context withinContext, 
org.apache.hadoop.hive.ql.met
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
       Path dataPath = new Path(withinContext.eventRoot, 
EximUtil.DATA_PATH_NAME);
-      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, 
dataPath);
+      writeDumpFiles(qlMdTable, null, fileListArray.getFirst(), withinContext, 
dataPath);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
         Path dataPath = new Path(withinContext.eventRoot, 
EximUtil.DATA_PATH_NAME + File.separator
@@ -104,7 +102,7 @@ private void createDumpFile(Context withinContext, 
org.apache.hadoop.hive.ql.met
 
   private void createDumpFileForTable(Context withinContext, 
org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
                     List<Partition> qlPtns, List<List<String>> fileListArray)
-          throws IOException, SemanticException, LoginException, 
MetaException, HiveFatalException {
+          throws IOException, SemanticException, LoginException, 
HiveFatalException {
     Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, 
qlMdTable.getDbName(), qlMdTable.getTableName());
     Context context = new Context(withinContext);
     context.setEventRoot(newPath);
@@ -223,11 +221,11 @@ public void handle(Context withinContext) throws 
Exception {
       if (numEntry != 0) {
         eventMessage.addWriteEventInfo(writeEventInfoList);
         payload = jsonMessageEncoder.getSerializer().serialize(eventMessage);
-        LOG.debug("payload for commit txn event : " + eventMessageAsJSON);
+          LOG.debug("payload for commit txn event : {}", eventMessageAsJSON);
       }
 
       org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
-      org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null;
+      org.apache.hadoop.hive.ql.metadata.Table qlMdTable;
       List<Partition> qlPtns = new ArrayList<>();
       List<List<String>> filesTobeAdded = new ArrayList<>();
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e12a2d9cf12..26d7c0f2ee1 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -123,6 +123,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -133,7 +134,6 @@
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 /**
@@ -520,7 +520,7 @@ public void abortTxn(AbortTxnRequest rqst) throws 
NoSuchTxnException, MetaExcept
     if (transactionalListeners != null) {
       //Find the write details for this transaction.
       //Doing it here before the metadata tables are updated below.
-      txnWriteDetails = getWriteIdsForTxnID(rqst.getTxnid());
+      txnWriteDetails = getWriteIdsMappingForTxns(Set.of(rqst.getTxnid()));
     }
     TxnType txnType = new AbortTxnFunction(rqst).execute(jdbcResource); 
     if (txnType != null) {
@@ -534,10 +534,10 @@ public static void notifyCommitOrAbortEvent(long txnId, 
EventMessage.EventType e
                                        List<TxnWriteDetails> txnWriteDetails, 
List<TransactionalMetaStoreEventListener> transactionalListeners) throws 
MetaException {
     List<Long> writeIds = txnWriteDetails.stream()
             .map(TxnWriteDetails::getWriteId)
-            .collect(Collectors.toList());
+            .toList();
     List<String> databases = txnWriteDetails.stream()
             .map(TxnWriteDetails::getDbName)
-            .collect(Collectors.toList());
+            .toList();
     ListenerEvent txnEvent;
     if (eventType.equals(EventMessage.EventType.ABORT_TXN)) {
       txnEvent = new AbortTxnEvent(txnId, txnType, null, databases, writeIds);
@@ -560,8 +560,10 @@ public void abortTxns(AbortTxnsRequest rqst) throws 
MetaException {
     if (transactionalListeners != null) {
       //Find the write details for this transaction.
       //Doing it here before the metadata tables are updated below.
-      for(Long txnId : txnIds)
-        txnWriteDetailsMap.put(txnId, getWriteIdsForTxnID(txnId));
+      List<TxnWriteDetails> txnWriteDetails = getWriteIdsMappingForTxns(new 
HashSet<>(txnIds));
+      txnWriteDetailsMap.putAll(txnWriteDetails.stream()
+                                               
.collect(Collectors.groupingBy(TxnWriteDetails::getTxnId)));
+
     }
 
     List<String> queries = new ArrayList<>();
@@ -595,8 +597,8 @@ public void abortTxns(AbortTxnsRequest rqst) throws 
MetaException {
 
       if (transactionalListeners != null) {
         for (Long txnId : txnIds) {
-          notifyCommitOrAbortEvent(txnId,EventMessage.EventType.ABORT_TXN,
-                  nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), 
dbConn, txnWriteDetailsMap.get(txnId), transactionalListeners);
+          notifyCommitOrAbortEvent(txnId, EventMessage.EventType.ABORT_TXN,
+                  nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), 
dbConn, txnWriteDetailsMap.getOrDefault(txnId, new ArrayList<>()), 
transactionalListeners);
         }
       }
     } catch (SQLException e) {
@@ -826,7 +828,7 @@ public LockResponse checkLock(CheckLockRequest rqst)
     if (CollectionUtils.isEmpty(lockInfos)) {
       throw new NoSuchLockException("No such lock " + 
JavaUtils.lockIdToString(extLockId));
     }
-    LockInfo lockInfo = lockInfos.get(0);
+    LockInfo lockInfo = lockInfos.getFirst();
     if (lockInfo.getTxnId() > 0) {
       new HeartbeatTxnFunction(lockInfo.getTxnId()).execute(jdbcResource);
     } else {
@@ -1138,7 +1140,7 @@ private boolean checkIfTableIsUsable(String tableName, 
boolean configValue) {
       jdbcResource.getJdbcTemplate().query("SELECT 1 FROM \"" + tableName + 
"\"",
           new MapSqlParameterSource(), ResultSet::next);
     } catch (DataAccessException e) {
-      LOG.debug("Catching sql exception in " + tableName + " check", e);
+        LOG.debug("Catching sql exception in {} check", tableName, e);
       if (e.getCause() instanceof SQLException) {
         if (dbProduct.isTableNotExistsError(e)) {
           return false;
@@ -1154,42 +1156,21 @@ private boolean checkIfTableIsUsable(String tableName, 
boolean configValue) {
   }
 
   /**
-   * Returns the databases updated by txnId.
-   * Queries TXN_TO_WRITE_ID using txnId.
+   * Returns the TxnWriteDetails updated by txnIds.
+   * Queries TXN_TO_WRITE_ID using txnIds.
    *
-   * @param txnId
-   * @throws MetaException
+   * @param txnIds Transaction IDs for which write IDs are requested.
+   * @throws MetaException throws MetaException
    */
-  private List<String> getTxnDbsUpdated(long txnId) throws MetaException {
+  private List<TxnWriteDetails> getWriteIdsMappingForTxns(Set<Long> txnIds) 
throws MetaException {
     try {
       return sqlRetryHandler.executeWithRetry(
-          new SqlRetryCallProperties().withCallerId("GetTxnDbsUpdatedHandler"),
-          () -> jdbcResource.execute(new GetTxnDbsUpdatedHandler(txnId)));
+              new 
SqlRetryCallProperties().withCallerId("GetWriteIdsMappingForTxnIdsHandler"),
+              () -> jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(txnIds)));
     } catch (MetaException e) {
       throw e;
     } catch (TException e) {
       throw new MetaException(e.getMessage());
     }
   }
-
-  /**
-   * Returns the databases and writeID updated by txnId.
-   * Queries TXN_TO_WRITE_ID using txnId.
-   *
-   * @param txnId Transaction ID for which write IDs are requested.
-   * @throws MetaException
-   */
-  public List<TxnWriteDetails> getWriteIdsForTxnID(long txnId) throws 
MetaException {
-    try {
-      return sqlRetryHandler.executeWithRetry(
-              new 
SqlRetryCallProperties().withCallerId("GetWriteIdsForTxnIDHandler"),
-              () -> jdbcResource.execute(new 
GetWriteIdsForTxnIDHandler(txnId)));
-    } catch (MetaException e) {
-      throw e;
-    } catch (TException e) {
-      throw new MetaException(e.getMessage());
-    }
-  }
-
-
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
index ac90f5ce61e..bcd5226e014 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
@@ -30,7 +30,6 @@
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent;
-import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -50,7 +49,7 @@
 import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetCompactionInfoHandler;
 import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetHighWaterMarkHandler;
 import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetOpenTxnTypeAndLockHandler;
-import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
+import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler;
 import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.TargetTxnIdListHandler;
 import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
 import org.apache.hadoop.hive.metastore.txn.jdbc.RollbackException;
@@ -71,6 +70,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -103,7 +103,7 @@ public TxnType execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExce
     List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
 
     if (!isHiveReplTxn) {
-      txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsForTxnIDHandler(rqst.getTxnid()));
+      txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
 
     }
     // Get the current TXN
@@ -585,7 +585,7 @@ private void 
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
   }
 
   /**
-   * Create Notifiaction Events on txn commit
+   * Create Notification Events on txn commit
    *
    * @param txnid committed txn
    * @param txnType transaction type
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
index c70985ec544..f6af1251483 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
@@ -18,11 +18,9 @@
 package org.apache.hadoop.hive.metastore.txn.jdbc.functions;
 
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
 import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -33,8 +31,7 @@
 import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
 import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext;
 import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
-import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetTxnDbsUpdatedHandler;
-import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
+import 
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
@@ -46,6 +43,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
@@ -131,10 +129,13 @@ public Void execute(MultiDataSourceJdbcResource 
jdbcResource) {
             //todo: add TXNS.COMMENT filed and set it to 'aborted by system 
due to timeout'
             LOG.info("Aborted the following transactions due to timeout: {}", 
batchToAbort);
             if (transactionalListeners != null) {
+              List<TxnWriteDetails> txnWriteDetails = jdbcResource.execute(new 
GetWriteIdsMappingForTxnIdsHandler(batchToAbort.keySet()));
+              Map<Long, List<TxnWriteDetails>> txnWriteDetailsMap =
+                      txnWriteDetails.stream()
+                              
.collect(Collectors.groupingBy(TxnWriteDetails::getTxnId));
               for (Map.Entry<Long, TxnType> txnEntry : 
batchToAbort.entrySet()) {
-                List<TxnWriteDetails> txnWriteDetails = 
jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnEntry.getKey()));
                 notifyCommitOrAbortEvent(txnEntry.getKey(), 
EventMessage.EventType.ABORT_TXN , txnEntry.getValue(),
-                        jdbcResource.getConnection(), txnWriteDetails, 
transactionalListeners);
+                        jdbcResource.getConnection(), 
txnWriteDetailsMap.getOrDefault(txnEntry.getKey(), new ArrayList<>()), 
transactionalListeners);
               }
               LOG.debug("Added Notifications for the transactions that are 
aborted due to timeout: {}", batchToAbort);
             }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
deleted file mode 100644
index 4600064afc3..00000000000
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
-
-import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Returns the databases updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
- */
-public class GetTxnDbsUpdatedHandler implements QueryHandler<List<String>> {
-  
-  private final long txnId;
-
-  public GetTxnDbsUpdatedHandler(long txnId) {
-    this.txnId = txnId;
-  }
-
-  @Override
-  public String getParameterizedQueryString(DatabaseProduct databaseProduct) 
throws MetaException {
-    return "SELECT DISTINCT \"T2W_DATABASE\" FROM \"TXN_TO_WRITE_ID\" 
\"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId";
-  }
-
-  @Override
-  public SqlParameterSource getQueryParameters() {
-    return new MapSqlParameterSource().addValue("txnId", txnId);
-  }
-
-  @Override
-  public List<String> extractData(ResultSet rs) throws SQLException, 
DataAccessException {
-    List<String> dbsUpdated = new ArrayList<>();
-    while (rs.next()) {
-      dbsUpdated.add(rs.getString(1));
-    }
-    return dbsUpdated;
-  }
-}
\ No newline at end of file
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
similarity index 75%
rename from 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
rename to 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
index c3a69fc52b0..4d680c981ad 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
@@ -27,39 +27,36 @@
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
-/**
- * Returns the databases and writeID updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
- */
-public class GetWriteIdsForTxnIDHandler implements 
QueryHandler<List<TxnWriteDetails>> {
+public class GetWriteIdsMappingForTxnIdsHandler  implements 
QueryHandler<List<TxnWriteDetails>> {
 
-    private final long txnId;
+    private final Set<Long> txnIds;
 
-    public GetWriteIdsForTxnIDHandler(long txnId) {
-        this.txnId = txnId;
+    public GetWriteIdsMappingForTxnIdsHandler(Set<Long> txnIds) {
+        this.txnIds= txnIds;
     }
 
     @Override
     public String getParameterizedQueryString(DatabaseProduct databaseProduct) 
throws MetaException {
-        return "SELECT DISTINCT \"T2W_DATABASE\", \"T2W_WRITEID\" FROM 
\"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId";
+        return "SELECT DISTINCT \"T2W_TXNID\", \"T2W_DATABASE\", 
\"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" IN 
(:txnIds)";
     }
 
     @Override
     public SqlParameterSource getQueryParameters() {
-        return new MapSqlParameterSource().addValue("txnId", txnId);
+        return new MapSqlParameterSource().addValue("txnIds", txnIds, 
Types.BIGINT);
     }
 
     @Override
     public List<TxnWriteDetails> extractData(ResultSet rs) throws 
SQLException, DataAccessException {
         List<TxnWriteDetails> dbsUpdated = new ArrayList<>();
         while (rs.next()) {
-            TxnWriteDetails entry = new TxnWriteDetails(txnId, 
rs.getString(1), rs.getLong(2));
+            TxnWriteDetails entry = new TxnWriteDetails(rs.getLong(1), 
rs.getString(2), rs.getLong(3));
             dbsUpdated.add(entry);
         }
         return dbsUpdated;
     }
 }
-

Reply via email to