This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6af511e53df HIVE-27797: Transactions that got timed out are not
getting logged as 'ABORTED' in NOTIFICATION_LOG (Taraka Rama Rao Lethavadla,
reviewed by Denys Kuzmenko)
6af511e53df is described below
commit 6af511e53dfa71f3e5c80b4abab210da29033e57
Author: tarak271 <[email protected]>
AuthorDate: Wed Dec 6 18:37:21 2023 +0530
HIVE-27797: Transactions that got timed out are not getting logged as
'ABORTED' in NOTIFICATION_LOG (Taraka Rama Rao Lethavadla, reviewed by Denys
Kuzmenko)
Closes #4805
---
.../parse/TestTimedOutTxnNotificationLogging.java | 201 +++++++++++++++++++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 27 ++-
2 files changed, 220 insertions(+), 8 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
new file mode 100644
index 00000000000..acad300c681
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.repl.ReplScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
+import
org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import
org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
+import
org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestTimedOutTxnNotificationLogging {
+
+ private HiveConf hiveConf;
+
+ private static IMetaStoreClient hive;
+
+ @Parameterized.Parameter
+ public int numberOfTxns;
+
+ @Parameterized.Parameter(1)
+ public TxnType txnType;
+
+ @Parameterized.Parameter(2)
+ public int expectedNotifications;
+
+ @Parameterized.Parameters(name = "{index}:
numberOfTxns={0},txnType={1},expectedNotifications={2}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] { { 3, TxnType.REPL_CREATED, 3 }, { 3, TxnType.DEFAULT,
3 }, { 3, TxnType.READ_ONLY, 0 } });
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ setConf();
+ TestTxnDbUtil.prepDb(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ hive = new HiveMetaStoreClient(hiveConf);
+ }
+
+ private void setConf() {
+ hiveConf = new HiveConf();
+ MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST,
true);
+ MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.WAREHOUSE, "/tmp");
+ MetastoreConf.setTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_TIMEOUT,
1000, TimeUnit.MILLISECONDS);
+ MetastoreConf.setTimeVar(hiveConf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 1, TimeUnit.SECONDS);
+ HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ SQLStdHiveAuthorizerFactory.class.getName());
+ MetastoreConf.setVar(hiveConf,
MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
+ DbNotificationListener.class.getName());
+ MetastoreConf.setTimeVar(hiveConf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 10,
+ TimeUnit.MILLISECONDS);
+ MetastoreConf.setTimeVar(hiveConf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0,
+ TimeUnit.SECONDS);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TestTxnDbUtil.cleanDb(hiveConf);
+ if (hive != null) {
+ hive.close();
+ }
+ SessionState.get().close();
+ hiveConf = null;
+ }
+
+ @Test
+ public void testTxnNotificationLogging() throws Exception {
+ try {
+ List<Long> txnIds = openTxns(numberOfTxns, txnType);
+ assertEquals(txnIds.size(), getNumberOfTxns(txnIds, TxnState.OPEN));
+ assertEquals(expectedNotifications, getNumNotifications(txnIds,
MessageBuilder.OPEN_TXN_EVENT));
+ Thread.sleep(1000);
+ runHouseKeeperService();
+ if (txnType != TxnType.REPL_CREATED) {
+ assertEquals(txnIds.size(), getNumberOfTxns(txnIds, TxnState.ABORTED));
+ assertEquals(expectedNotifications, getNumNotifications(txnIds,
MessageBuilder.ABORT_TXN_EVENT));
+ }
+ } finally {
+ runTxnHouseKeeperService();
+ }
+ }
+
+ private int getNumNotifications(List<Long> txnIds, String eventType) throws
IOException, TException {
+ int numNotifications = 0;
+ IMetaStoreClient.NotificationFilter evFilter = new AndFilter(new
ReplEventFilter(new ReplScope()),
+ new CatalogFilter(MetaStoreUtils.getDefaultCatalog(hiveConf)), new
EventBoundaryFilter(0, 100));
+ NotificationEventResponse rsp = hive.getNextNotification(new
NotificationEventRequest(), true, evFilter);
+ if (rsp.getEvents() == null) {
+ return numNotifications;
+ }
+ Iterator<NotificationEvent> eventIterator = rsp.getEvents().iterator();
+ MessageDeserializer deserializer = null;
+ while (eventIterator.hasNext()) {
+ NotificationEvent ev = eventIterator.next();
+ if (eventType.equals(ev.getEventType())) {
+ deserializer = ReplUtils.getEventDeserializer(ev);
+ switch (ev.getEventType()) {
+ case MessageBuilder.OPEN_TXN_EVENT:
+ OpenTxnMessage openTxnMessage =
deserializer.getOpenTxnMessage(ev.getMessage());
+ if (txnIds.contains(openTxnMessage.getTxnIds().get(0))) {
+ numNotifications++;
+ }
+ break;
+ case MessageBuilder.ABORT_TXN_EVENT:
+ AbortTxnMessage abortTxnMessage =
deserializer.getAbortTxnMessage(ev.getMessage());
+ if (txnIds.contains(abortTxnMessage.getTxnId())) {
+ numNotifications++;
+ }
+ }
+ }
+ }
+ return numNotifications;
+ }
+
+ private List<Long> openTxns(int txnCounter, TxnType txnType) throws
TException {
+ List<Long> txnIds = new LinkedList<>();
+ for (; txnCounter > 0; txnCounter--) {
+ if (txnType == TxnType.REPL_CREATED) {
+ Long srcTxn = (long) (11 + txnCounter);
+ List<Long> srcTxns = Arrays.asList(new Long[] { srcTxn });
+ txnIds.addAll(hive.replOpenTxn("testPolicy", srcTxns, "hive",
txnType));
+ } else {
+ txnIds.add(hive.openTxn("hive", txnType));
+ }
+ }
+ return txnIds;
+ }
+
+ private int getNumberOfTxns(List<Long> txnIds, TxnState txnState) throws
TException {
+ AtomicInteger numTxns = new AtomicInteger();
+ hive.showTxns().getOpen_txns().forEach(txnInfo -> {
+ if (txnInfo.getState() == txnState && txnIds.contains(txnInfo.getId())) {
+ numTxns.incrementAndGet();
+ }
+ });
+ return numTxns.get();
+ }
+
+ private void runHouseKeeperService() {
+ MetastoreTaskThread acidHouseKeeperService = new AcidHouseKeeperService();
+ acidHouseKeeperService.setConf(hiveConf);
+ acidHouseKeeperService.run(); //this will abort timedout txns
+ }
+
+ private void runTxnHouseKeeperService() {
+ MetastoreTaskThread acidTxnCleanerService = new AcidTxnCleanerService();
+ acidTxnCleanerService.setConf(hiveConf);
+ acidTxnCleanerService.run(); //this will remove empty aborted txns
+ }
+}
\ No newline at end of file
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c661e34b073..8905a01490e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -5799,7 +5799,7 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
//timely way.
timeOutLocks();
while (true) {
- String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " +
TxnStatus.OPEN +
+ String s = " \"TXN_ID\", \"TXN_TYPE\" FROM \"TXNS\" WHERE
\"TXN_STATE\" = " + TxnStatus.OPEN +
" AND (" +
"\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
" AND \"TXN_LAST_HEARTBEAT\" < " + getEpochFn(dbProduct) + "-" +
timeout +
@@ -5811,14 +5811,14 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE,
s);
LOG.debug("Going to execute query <{}>", s);
- List<List<Long>> timedOutTxns =
jdbcResource.getJdbcTemplate().query(s, rs -> {
- List<List<Long>> txnbatch = new ArrayList<>();
- List<Long> currentBatch = new
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+ List<Map<Long, TxnType>> timedOutTxns =
jdbcResource.getJdbcTemplate().query(s, rs -> {
+ List<Map<Long, TxnType>> txnbatch = new ArrayList<>();
+ Map<Long, TxnType> currentBatch = new
HashMap<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
while (rs.next()) {
- currentBatch.add(rs.getLong(1));
+ currentBatch.put(rs.getLong(1),TxnType.findByValue(rs.getInt(2)));
if (currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
txnbatch.add(currentBatch);
- currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+ currentBatch = new HashMap<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
}
}
if (currentBatch.size() > 0) {
@@ -5835,12 +5835,23 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
Object savePoint = context.getTransactionStatus().createSavepoint();
int numTxnsAborted = 0;
- for (List<Long> batchToAbort : timedOutTxns) {
+ for (Map<Long, TxnType> batchToAbort : timedOutTxns) {
context.getTransactionStatus().releaseSavepoint(savePoint);
savePoint = context.getTransactionStatus().createSavepoint();
- if (abortTxns(jdbcResource.getConnection(), batchToAbort, true,
false, false, TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
+ if (abortTxns(jdbcResource.getConnection(), new
ArrayList<>(batchToAbort.keySet()), true, false, false,
TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
numTxnsAborted += batchToAbort.size();
//todo: add TXNS.COMMENT filed and set it to 'aborted by system
due to timeout'
+ LOG.info("Aborted the following transactions due to timeout: {}",
batchToAbort);
+ if (transactionalListeners != null) {
+ for (Map.Entry<Long, TxnType> txnEntry :
batchToAbort.entrySet()) {
+ List<String> dbsUpdated = getTxnDbsUpdated(txnEntry.getKey(),
jdbcResource.getConnection());
+
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+ EventMessage.EventType.ABORT_TXN,
+ new AbortTxnEvent(txnEntry.getKey(), txnEntry.getValue(),
null, dbsUpdated),
+ jdbcResource.getConnection(), sqlGenerator);
+ }
+ LOG.debug("Added Notifications for the transactions that are
aborted due to timeout: {}", batchToAbort);
+ }
} else {
//could not abort all txns in this batch - this may happen because
in parallel with this
//operation there was activity on one of the txns in this batch
(commit/abort/heartbeat)