This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 906f61cb997 HIVE-29077: Reduce HMS calls while adding entries into the transactionListeners while abortTxn (#5946) 906f61cb997 is described below commit 906f61cb9972ac642fbf4ac8e01ab6be29f2df11 Author: Harshal Patel <109334642+harshal...@users.noreply.github.com> AuthorDate: Sat Jul 12 18:07:10 2025 +0530 HIVE-29077: Reduce HMS calls while adding entries into the transactionListeners while abortTxn (#5946) --- .../TestTransactionalDbNotificationListener.java | 34 ++++++++++++ .../ql/parse/repl/dump/events/AbortTxnHandler.java | 8 +-- .../parse/repl/dump/events/CommitTxnHandler.java | 14 +++-- .../hadoop/hive/metastore/txn/TxnHandler.java | 57 +++++++------------- .../txn/jdbc/functions/CommitTxnFunction.java | 8 +-- .../jdbc/functions/PerformTimeoutsFunction.java | 13 ++--- .../txn/jdbc/queries/GetTxnDbsUpdatedHandler.java | 62 ---------------------- ...ava => GetWriteIdsMappingForTxnIdsHandler.java} | 21 ++++---- 8 files changed, 80 insertions(+), 137 deletions(-) diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java index 7d8872ba986..0552d577a37 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java @@ -21,7 +21,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfForTest; @@ -161,4 +166,33 @@ public void commitTxn() throws Exception { assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType()); } + @Test + public void commitTxnWithAllocateWriteID() throws Exception { + long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY); + long txnId2 = msClient.openTxn("me", TxnType.DEFAULT); + + NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); + assertEquals(1, rsp.getEventsSize()); + + msClient.commitTxn(txnId1); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + assertEquals(0, rsp.getEventsSize()); + + msClient.allocateTableWriteId(txnId2, "test", "t1"); + msClient.commitTxn(txnId2); + rsp = msClient.getNextNotification(firstEventId + 1, 0, null); + NotificationEvent commitEvent = rsp.getEvents().get(1); + + ObjectMapper objectMapper = new ObjectMapper(); + Map<String, Object> commitMessage = objectMapper.readValue(commitEvent.getMessage(), Map.class); + assertEquals(List.of(1), commitMessage.get("writeIds")); + assertEquals(List.of("test"), commitMessage.get("databases")); + + assertEquals(2, rsp.getEventsSize()); // alloc_write_id and commit_txn events + + assertEquals(firstEventId + 3, commitEvent.getEventId()); + assertTrue(commitEvent.getEventTime() >= startTime); + assertEquals(EventType.COMMIT_TXN.toString(), commitEvent.getEventType()); + } + } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java index 5847d164fbb..67dd4b5dfc3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java @@ -17,20 +17,14 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import com.google.common.collect.Collections2; -import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; -import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> { @@ -55,7 +49,7 @@ public void handle(Context withinContext) throws Exception { List<String> dbsUpdated = eventMessage.getDbsUpdated() .stream() .map(StringUtils::normalizeIdentifier) - .collect(Collectors.toList()); + .toList(); if ((writeIds == null || writeIds.isEmpty() || !dbsUpdated.contains(contextDbName))) { LOG.info("Filter out #{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON); return; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index ed554245596..0d3a3d4c901 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -23,11 +23,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; -import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -62,7 +60,7 @@ CommitTxnMessage eventMessage(String stringRepresentation) { private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext, Path dataPath) - throws IOException, LoginException, MetaException, HiveFatalException, SemanticException { + throws IOException, LoginException, HiveFatalException, SemanticException { boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); if (copyAtLoad) { // encoded filename/checksum of files, write into _files @@ -76,7 +74,7 @@ private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> fil private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, List<Partition> qlPtns, List<List<String>> fileListArray) - throws IOException, SemanticException, LoginException, MetaException, HiveFatalException { + throws IOException, SemanticException, LoginException, HiveFatalException { if (fileListArray == null || fileListArray.isEmpty()) { return; } @@ -92,7 +90,7 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met if ((null == qlPtns) || qlPtns.isEmpty()) { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath); + writeDumpFiles(qlMdTable, null, fileListArray.getFirst(), withinContext, dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator @@ -104,7 +102,7 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, List<Partition> qlPtns, List<List<String>> fileListArray) - throws IOException, SemanticException, LoginException, MetaException, HiveFatalException { + throws IOException, SemanticException, LoginException, HiveFatalException { Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); Context context = new Context(withinContext); context.setEventRoot(newPath); @@ -223,11 +221,11 @@ public void handle(Context withinContext) throws Exception { if (numEntry != 0) { eventMessage.addWriteEventInfo(writeEventInfoList); payload = jsonMessageEncoder.getSerializer().serialize(eventMessage); - LOG.debug("payload for commit txn event : " + eventMessageAsJSON); + LOG.debug("payload for commit txn event : {}", eventMessageAsJSON); } org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null; - org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null; + org.apache.hadoop.hive.ql.metadata.Table qlMdTable; List<Partition> qlPtns = new ArrayList<>(); List<List<String>> filesTobeAdded = new ArrayList<>(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index e12a2d9cf12..26d7c0f2ee1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -123,6 +123,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -133,7 +134,6 @@ import java.util.function.BiPredicate; import java.util.stream.Collectors; -import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; /** @@ -520,7 +520,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept if (transactionalListeners != null) { //Find the write details for this transaction. //Doing it here before the metadata tables are updated below. - txnWriteDetails = getWriteIdsForTxnID(rqst.getTxnid()); + txnWriteDetails = getWriteIdsMappingForTxns(Set.of(rqst.getTxnid())); } TxnType txnType = new AbortTxnFunction(rqst).execute(jdbcResource); if (txnType != null) { @@ -534,10 +534,10 @@ public static void notifyCommitOrAbortEvent(long txnId, EventMessage.EventType e List<TxnWriteDetails> txnWriteDetails, List<TransactionalMetaStoreEventListener> transactionalListeners) throws MetaException { List<Long> writeIds = txnWriteDetails.stream() .map(TxnWriteDetails::getWriteId) - .collect(Collectors.toList()); + .toList(); List<String> databases = txnWriteDetails.stream() .map(TxnWriteDetails::getDbName) - .collect(Collectors.toList()); + .toList(); ListenerEvent txnEvent; if (eventType.equals(EventMessage.EventType.ABORT_TXN)) { txnEvent = new AbortTxnEvent(txnId, txnType, null, databases, writeIds); @@ -560,8 +560,10 @@ public void abortTxns(AbortTxnsRequest rqst) throws MetaException { if (transactionalListeners != null) { //Find the write details for this transaction. //Doing it here before the metadata tables are updated below. - for(Long txnId : txnIds) - txnWriteDetailsMap.put(txnId, getWriteIdsForTxnID(txnId)); + List<TxnWriteDetails> txnWriteDetails = getWriteIdsMappingForTxns(new HashSet<>(txnIds)); + txnWriteDetailsMap.putAll(txnWriteDetails.stream() + .collect(Collectors.groupingBy(TxnWriteDetails::getTxnId))); + } List<String> queries = new ArrayList<>(); @@ -595,8 +597,8 @@ public void abortTxns(AbortTxnsRequest rqst) throws MetaException { if (transactionalListeners != null) { for (Long txnId : txnIds) { - notifyCommitOrAbortEvent(txnId,EventMessage.EventType.ABORT_TXN, - nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), dbConn, txnWriteDetailsMap.get(txnId), transactionalListeners); + notifyCommitOrAbortEvent(txnId, EventMessage.EventType.ABORT_TXN, + nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY), dbConn, txnWriteDetailsMap.getOrDefault(txnId, new ArrayList<>()), transactionalListeners); } } } catch (SQLException e) { @@ -826,7 +828,7 @@ public LockResponse checkLock(CheckLockRequest rqst) if (CollectionUtils.isEmpty(lockInfos)) { throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } - LockInfo lockInfo = lockInfos.get(0); + LockInfo lockInfo = lockInfos.getFirst(); if (lockInfo.getTxnId() > 0) { new HeartbeatTxnFunction(lockInfo.getTxnId()).execute(jdbcResource); } else { @@ -1138,7 +1140,7 @@ private boolean checkIfTableIsUsable(String tableName, boolean configValue) { jdbcResource.getJdbcTemplate().query("SELECT 1 FROM \"" + tableName + "\"", new MapSqlParameterSource(), ResultSet::next); } catch (DataAccessException e) { - LOG.debug("Catching sql exception in " + tableName + " check", e); + LOG.debug("Catching sql exception in {} check", tableName, e); if (e.getCause() instanceof SQLException) { if (dbProduct.isTableNotExistsError(e)) { return false; @@ -1154,42 +1156,21 @@ private boolean checkIfTableIsUsable(String tableName, boolean configValue) { } /** - * Returns the databases updated by txnId. - * Queries TXN_TO_WRITE_ID using txnId. + * Returns the TxnWriteDetails updated by txnIds. + * Queries TXN_TO_WRITE_ID using txnIds. * - * @param txnId - * @throws MetaException + * @param txnIds Transaction IDs for which write IDs are requested. + * @throws MetaException throws MetaException */ - private List<String> getTxnDbsUpdated(long txnId) throws MetaException { + private List<TxnWriteDetails> getWriteIdsMappingForTxns(Set<Long> txnIds) throws MetaException { try { return sqlRetryHandler.executeWithRetry( - new SqlRetryCallProperties().withCallerId("GetTxnDbsUpdatedHandler"), - () -> jdbcResource.execute(new GetTxnDbsUpdatedHandler(txnId))); + new SqlRetryCallProperties().withCallerId("GetWriteIdsMappingForTxnIdsHandler"), + () -> jdbcResource.execute(new GetWriteIdsMappingForTxnIdsHandler(txnIds))); } catch (MetaException e) { throw e; } catch (TException e) { throw new MetaException(e.getMessage()); } } - - /** - * Returns the databases and writeID updated by txnId. - * Queries TXN_TO_WRITE_ID using txnId. - * - * @param txnId Transaction ID for which write IDs are requested. - * @throws MetaException - */ - public List<TxnWriteDetails> getWriteIdsForTxnID(long txnId) throws MetaException { - try { - return sqlRetryHandler.executeWithRetry( - new SqlRetryCallProperties().withCallerId("GetWriteIdsForTxnIDHandler"), - () -> jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnId))); - } catch (MetaException e) { - throw e; - } catch (TException e) { - throw new MetaException(e.getMessage()); - } - } - - } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java index ac90f5ce61e..bcd5226e014 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent; -import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; @@ -50,7 +49,7 @@ import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetCompactionInfoHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetHighWaterMarkHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetOpenTxnTypeAndLockHandler; -import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.queries.TargetTxnIdListHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.jdbc.RollbackException; @@ -71,6 +70,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -103,7 +103,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce List<TxnWriteDetails> txnWriteDetails = new ArrayList<>(); if (!isHiveReplTxn) { - txnWriteDetails = jdbcResource.execute(new GetWriteIdsForTxnIDHandler(rqst.getTxnid())); + txnWriteDetails = jdbcResource.execute(new GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid()))); } // Get the current TXN @@ -585,7 +585,7 @@ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc } /** - * Create Notifiaction Events on txn commit + * Create Notification Events on txn commit * * @param txnid committed txn * @param txnType transaction type diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java index c70985ec544..f6af1251483 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.metastore.txn.jdbc.functions; import org.apache.hadoop.hive.metastore.DatabaseProduct; -import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; @@ -33,8 +31,7 @@ import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext; import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; -import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetTxnDbsUpdatedHandler; -import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; @@ -46,6 +43,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent; import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; @@ -131,10 +129,13 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) { //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' LOG.info("Aborted the following transactions due to timeout: {}", batchToAbort); if (transactionalListeners != null) { + List<TxnWriteDetails> txnWriteDetails = jdbcResource.execute(new GetWriteIdsMappingForTxnIdsHandler(batchToAbort.keySet())); + Map<Long, List<TxnWriteDetails>> txnWriteDetailsMap = + txnWriteDetails.stream() + .collect(Collectors.groupingBy(TxnWriteDetails::getTxnId)); for (Map.Entry<Long, TxnType> txnEntry : batchToAbort.entrySet()) { - List<TxnWriteDetails> txnWriteDetails = jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnEntry.getKey())); notifyCommitOrAbortEvent(txnEntry.getKey(), EventMessage.EventType.ABORT_TXN , txnEntry.getValue(), - jdbcResource.getConnection(), txnWriteDetails, transactionalListeners); + jdbcResource.getConnection(), txnWriteDetailsMap.getOrDefault(txnEntry.getKey(), new ArrayList<>()), transactionalListeners); } LOG.debug("Added Notifications for the transactions that are aborted due to timeout: {}", batchToAbort); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java deleted file mode 100644 index 4600064afc3..00000000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn.jdbc.queries; - -import org.apache.hadoop.hive.metastore.DatabaseProduct; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler; -import org.springframework.dao.DataAccessException; -import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; -import org.springframework.jdbc.core.namedparam.SqlParameterSource; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; - -/** - * Returns the databases updated by txnId. - * Queries TXN_TO_WRITE_ID using txnId. - */ -public class GetTxnDbsUpdatedHandler implements QueryHandler<List<String>> { - - private final long txnId; - - public GetTxnDbsUpdatedHandler(long txnId) { - this.txnId = txnId; - } - - @Override - public String getParameterizedQueryString(DatabaseProduct databaseProduct) throws MetaException { - return "SELECT DISTINCT \"T2W_DATABASE\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId"; - } - - @Override - public SqlParameterSource getQueryParameters() { - return new MapSqlParameterSource().addValue("txnId", txnId); - } - - @Override - public List<String> extractData(ResultSet rs) throws SQLException, DataAccessException { - List<String> dbsUpdated = new ArrayList<>(); - while (rs.next()) { - dbsUpdated.add(rs.getString(1)); - } - return dbsUpdated; - } -} \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java similarity index 75% rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java rename to standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java index c3a69fc52b0..4d680c981ad 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java @@ -27,39 +27,36 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.List; +import java.util.Set; -/** - * Returns the databases and writeID updated by txnId. - * Queries TXN_TO_WRITE_ID using txnId. - */ -public class GetWriteIdsForTxnIDHandler implements QueryHandler<List<TxnWriteDetails>> { +public class GetWriteIdsMappingForTxnIdsHandler implements QueryHandler<List<TxnWriteDetails>> { - private final long txnId; + private final Set<Long> txnIds; - public GetWriteIdsForTxnIDHandler(long txnId) { - this.txnId = txnId; + public GetWriteIdsMappingForTxnIdsHandler(Set<Long> txnIds) { + this.txnIds= txnIds; } @Override public String getParameterizedQueryString(DatabaseProduct databaseProduct) throws MetaException { - return "SELECT DISTINCT \"T2W_DATABASE\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId"; + return "SELECT DISTINCT \"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" IN (:txnIds)"; } @Override public SqlParameterSource getQueryParameters() { - return new MapSqlParameterSource().addValue("txnId", txnId); + return new MapSqlParameterSource().addValue("txnIds", txnIds, Types.BIGINT); } @Override public List<TxnWriteDetails> extractData(ResultSet rs) throws SQLException, DataAccessException { List<TxnWriteDetails> dbsUpdated = new ArrayList<>(); while (rs.next()) { - TxnWriteDetails entry = new TxnWriteDetails(txnId, rs.getString(1), rs.getLong(2)); + TxnWriteDetails entry = new TxnWriteDetails(rs.getLong(1), rs.getString(2), rs.getLong(3)); dbsUpdated.add(entry); } return dbsUpdated; } } -