This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6af511e53df HIVE-27797: Transactions that got timed out are not 
getting logged as 'ABORTED' in NOTIFICATION_LOG (Taraka Rama Rao Lethavadla, 
reviewed by Denys Kuzmenko)
6af511e53df is described below

commit 6af511e53dfa71f3e5c80b4abab210da29033e57
Author: tarak271 <[email protected]>
AuthorDate: Wed Dec 6 18:37:21 2023 +0530

    HIVE-27797: Transactions that got timed out are not getting logged as 
'ABORTED' in NOTIFICATION_LOG (Taraka Rama Rao Lethavadla, reviewed by Denys 
Kuzmenko)
    
    Closes #4805
---
 .../parse/TestTimedOutTxnNotificationLogging.java  | 201 +++++++++++++++++++++
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  27 ++-
 2 files changed, 220 insertions(+), 8 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
new file mode 100644
index 00000000000..acad300c681
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.repl.ReplScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestTimedOutTxnNotificationLogging {
+
+  private HiveConf hiveConf;
+
+  private static IMetaStoreClient hive;
+
+  @Parameterized.Parameter
+  public int numberOfTxns;
+
+  @Parameterized.Parameter(1)
+  public TxnType txnType;
+
+  @Parameterized.Parameter(2)
+  public int expectedNotifications;
+
+  @Parameterized.Parameters(name = "{index}: 
numberOfTxns={0},txnType={1},expectedNotifications={2}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[][] { { 3, TxnType.REPL_CREATED, 3 }, { 3, TxnType.DEFAULT, 
3 }, { 3, TxnType.READ_ONLY, 0 } });
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    setConf();
+    TestTxnDbUtil.prepDb(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    hive = new HiveMetaStoreClient(hiveConf);
+  }
+
+  private void setConf() {
+    hiveConf = new HiveConf();
+    MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST, 
true);
+    MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.WAREHOUSE, "/tmp");
+    MetastoreConf.setTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_TIMEOUT, 
1000, TimeUnit.MILLISECONDS);
+    MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 1, TimeUnit.SECONDS);
+    HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+        SQLStdHiveAuthorizerFactory.class.getName());
+    MetastoreConf.setVar(hiveConf, 
MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
+        DbNotificationListener.class.getName());
+    MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 10,
+        TimeUnit.MILLISECONDS);
+    MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0,
+        TimeUnit.SECONDS);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TestTxnDbUtil.cleanDb(hiveConf);
+    if (hive != null) {
+      hive.close();
+    }
+    SessionState.get().close();
+    hiveConf = null;
+  }
+
+  @Test
+  public void testTxnNotificationLogging() throws Exception {
+    try {
+      List<Long> txnIds = openTxns(numberOfTxns, txnType);
+      assertEquals(txnIds.size(), getNumberOfTxns(txnIds, TxnState.OPEN));
+      assertEquals(expectedNotifications, getNumNotifications(txnIds, 
MessageBuilder.OPEN_TXN_EVENT));
+      Thread.sleep(1000);
+      runHouseKeeperService();
+      if (txnType != TxnType.REPL_CREATED) {
+        assertEquals(txnIds.size(), getNumberOfTxns(txnIds, TxnState.ABORTED));
+        assertEquals(expectedNotifications, getNumNotifications(txnIds, 
MessageBuilder.ABORT_TXN_EVENT));
+      }
+    } finally {
+      runTxnHouseKeeperService();
+    }
+  }
+
+  private int getNumNotifications(List<Long> txnIds, String eventType) throws 
IOException, TException {
+    int numNotifications = 0;
+    IMetaStoreClient.NotificationFilter evFilter = new AndFilter(new 
ReplEventFilter(new ReplScope()),
+        new CatalogFilter(MetaStoreUtils.getDefaultCatalog(hiveConf)), new 
EventBoundaryFilter(0, 100));
+    NotificationEventResponse rsp = hive.getNextNotification(new 
NotificationEventRequest(), true, evFilter);
+    if (rsp.getEvents() == null) {
+      return numNotifications;
+    }
+    Iterator<NotificationEvent> eventIterator = rsp.getEvents().iterator();
+    MessageDeserializer deserializer = null;
+    while (eventIterator.hasNext()) {
+      NotificationEvent ev = eventIterator.next();
+      if (eventType.equals(ev.getEventType())) {
+        deserializer = ReplUtils.getEventDeserializer(ev);
+        switch (ev.getEventType()) {
+        case MessageBuilder.OPEN_TXN_EVENT:
+          OpenTxnMessage openTxnMessage = 
deserializer.getOpenTxnMessage(ev.getMessage());
+          if (txnIds.contains(openTxnMessage.getTxnIds().get(0))) {
+            numNotifications++;
+          }
+          break;
+        case MessageBuilder.ABORT_TXN_EVENT:
+          AbortTxnMessage abortTxnMessage = 
deserializer.getAbortTxnMessage(ev.getMessage());
+          if (txnIds.contains(abortTxnMessage.getTxnId())) {
+            numNotifications++;
+          }
+        }
+      }
+    }
+    return numNotifications;
+  }
+
+  private List<Long> openTxns(int txnCounter, TxnType txnType) throws 
TException {
+    List<Long> txnIds = new LinkedList<>();
+    for (; txnCounter > 0; txnCounter--) {
+      if (txnType == TxnType.REPL_CREATED) {
+        Long srcTxn = (long) (11 + txnCounter);
+        List<Long> srcTxns = Arrays.asList(new Long[] { srcTxn });
+        txnIds.addAll(hive.replOpenTxn("testPolicy", srcTxns, "hive", 
txnType));
+      } else {
+        txnIds.add(hive.openTxn("hive", txnType));
+      }
+    }
+    return txnIds;
+  }
+
+  private int getNumberOfTxns(List<Long> txnIds, TxnState txnState) throws 
TException {
+    AtomicInteger numTxns = new AtomicInteger();
+    hive.showTxns().getOpen_txns().forEach(txnInfo -> {
+      if (txnInfo.getState() == txnState && txnIds.contains(txnInfo.getId())) {
+        numTxns.incrementAndGet();
+      }
+    });
+    return numTxns.get();
+  }
+
+  private void runHouseKeeperService() {
+    MetastoreTaskThread acidHouseKeeperService = new AcidHouseKeeperService();
+    acidHouseKeeperService.setConf(hiveConf);
+    acidHouseKeeperService.run(); //this will abort timedout txns
+  }
+
+  private void runTxnHouseKeeperService() {
+    MetastoreTaskThread acidTxnCleanerService = new AcidTxnCleanerService();
+    acidTxnCleanerService.setConf(hiveConf);
+    acidTxnCleanerService.run(); //this will remove empty aborted txns
+  }
+}
\ No newline at end of file
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c661e34b073..8905a01490e 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -5799,7 +5799,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       //timely way.
       timeOutLocks();
       while (true) {
-        String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
+        String s = " \"TXN_ID\", \"TXN_TYPE\" FROM \"TXNS\" WHERE 
\"TXN_STATE\" = " + TxnStatus.OPEN +
             " AND (" +
             "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
             " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + "-" + 
timeout +
@@ -5811,14 +5811,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
 
         LOG.debug("Going to execute query <{}>", s);
-        List<List<Long>> timedOutTxns = 
jdbcResource.getJdbcTemplate().query(s, rs -> {
-          List<List<Long>> txnbatch = new ArrayList<>();
-          List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+        List<Map<Long, TxnType>> timedOutTxns = 
jdbcResource.getJdbcTemplate().query(s, rs -> {
+          List<Map<Long, TxnType>> txnbatch = new ArrayList<>();
+          Map<Long, TxnType> currentBatch = new 
HashMap<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
           while (rs.next()) {
-            currentBatch.add(rs.getLong(1));
+            currentBatch.put(rs.getLong(1),TxnType.findByValue(rs.getInt(2)));
             if (currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
               txnbatch.add(currentBatch);
-              currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+              currentBatch = new HashMap<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
             }
           }
           if (currentBatch.size() > 0) {
@@ -5835,12 +5835,23 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         Object savePoint = context.getTransactionStatus().createSavepoint();
 
         int numTxnsAborted = 0;
-        for (List<Long> batchToAbort : timedOutTxns) {
+        for (Map<Long, TxnType> batchToAbort : timedOutTxns) {
           context.getTransactionStatus().releaseSavepoint(savePoint);
           savePoint = context.getTransactionStatus().createSavepoint();
-          if (abortTxns(jdbcResource.getConnection(), batchToAbort, true, 
false, false, TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
+          if (abortTxns(jdbcResource.getConnection(), new 
ArrayList<>(batchToAbort.keySet()), true, false, false, 
TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
             numTxnsAborted += batchToAbort.size();
             //todo: add TXNS.COMMENT filed and set it to 'aborted by system 
due to timeout'
+            LOG.info("Aborted the following transactions due to timeout: {}", 
batchToAbort);
+            if (transactionalListeners != null) {
+              for (Map.Entry<Long, TxnType> txnEntry : 
batchToAbort.entrySet()) {
+                List<String> dbsUpdated = getTxnDbsUpdated(txnEntry.getKey(), 
jdbcResource.getConnection());
+                
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                    EventMessage.EventType.ABORT_TXN,
+                    new AbortTxnEvent(txnEntry.getKey(), txnEntry.getValue(), 
null, dbsUpdated),
+                    jdbcResource.getConnection(), sqlGenerator);
+              }
+              LOG.debug("Added Notifications for the transactions that are 
aborted due to timeout: {}", batchToAbort);
+            }
           } else {
             //could not abort all txns in this batch - this may happen because 
in parallel with this
             //operation there was activity on one of the txns in this batch 
(commit/abort/heartbeat)

Reply via email to