This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bdd7e101 IMPALA-13518: Show target name of COMMIT_TXN events in logs
7bdd7e101 is described below

commit 7bdd7e10101b33d63f3547fa2ec7aa3015ef24cf
Author: stiga-huang <[email protected]>
AuthorDate: Wed Nov 6 15:14:38 2024 +0800

    IMPALA-13518: Show target name of COMMIT_TXN events in logs
    
    The message of a COMMIT_TXN event just contains the transaction id
    (txnid). In the logs of top-10 expensive events and top-10 targets that
    contribute to the lag, we show the target as CLUSTER_WIDE.
    
    However, when processing the events, catalogd actually finds
    the involved tables and reloads them. It'd be helpful to show the names
    of the tables involved in the transaction.
    
    This patch overrides the getTargetName() method in CommitTxnEvent to
    show the table names. They are collected after the event is processed.
    
    Tests:
     - Add tests in MetastoreEventsProcessorTest
    
    Change-Id: I4a7cb5e716453290866a4c3e74c0d269f621144f
    Reviewed-on: http://gerrit.cloudera.org:8080/22036
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Reviewed-by: Sai Hemanth Gantasala <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  10 ++
 .../impala/catalog/events/MetastoreEvents.java     |   4 +-
 .../events/MetastoreEventsProcessorTest.java       | 104 +++++++++++++++++++++
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 03feafe22..5d199174b 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -833,6 +834,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     private final CommitTxnMessage commitTxnMessage_;
     private final long txnId_;
     private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
+    private final Set<String> tableNames_ = new HashSet<>();
 
     public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) {
@@ -846,6 +848,12 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
           txnId_);
     }
 
+    @Override
+    public String getTargetName() {
+      if (tableNames_.isEmpty()) return CLUSTER_WIDE_TARGET;
+      return tableNames_.stream().sorted().collect(Collectors.joining(","));
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
       // To ensure no memory leaking in case an exception is thrown, we remove 
entries
@@ -907,6 +915,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
             getEventId(),
             Collections.singletonList(tableWriteId.getWriteId()),
             MutableValidWriteIdList.WriteIdStatus.COMMITTED);
+        tableNames_.add(tableWriteId.getDbName() + "." + 
tableWriteId.getTblName());
       }
     }
 
@@ -921,6 +930,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         TableName tableName = new TableName(tbl.getDbName(), 
tbl.getTableName());
         parts.add(commitTxnMessage_.getPartitionObj(i));
         tableNameToIdxs.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(i);
+        tableNames_.add(tableName.toString());
       }
       for (Map.Entry<TableName, List<Integer>> entry : 
tableNameToIdxs.entrySet()) {
         org.apache.hadoop.hive.metastore.api.Table tbl =
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 10d799426..c2612a878 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -585,6 +585,8 @@ public class MetastoreEvents {
     // logger format compatible string to prepend to a log formatted message
     private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} 
EventType: {} ";
 
+    protected static final String CLUSTER_WIDE_TARGET = "CLUSTER_WIDE";
+
     // CatalogServiceCatalog instance on which the event needs to be acted upon
     protected final CatalogServiceCatalog catalog_;
 
@@ -655,7 +657,7 @@ public class MetastoreEvents {
     public String getTableName() { return tblName_; }
 
     public String getTargetName() {
-      if (dbName_ == null && tblName_ == null) return "CLUSTER_WIDE";
+      if (dbName_ == null && tblName_ == null) return CLUSTER_WIDE_TARGET;
       if (tblName_ == null) return dbName_;
       return dbName_ + "." + tblName_;
     }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index db35349d3..e8621a075 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -4261,6 +4261,110 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testCommitTxnEventTargetName() throws Exception {
+    String tblName = "test_commit_txn";
+    String partTblName = "test_commit_txn_part";
+    String insertNonPartTbl =
+        "insert into table " + TEST_DB_NAME + '.' + tblName + " values('a', 
'b')";
+    String insertPartTbl = String.format(
+        "insert into table %s.%s partition(p1='a') values('a', 'b')",
+        TEST_DB_NAME, partTblName);
+    createDatabase(TEST_DB_NAME, null);
+    createTransactionalTable(TEST_DB_NAME, tblName, false);
+    createTransactionalTable(TEST_DB_NAME, partTblName, true);
+    eventsProcessor_.processEvents();
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      // 1. Test on an empty COMMIT_TXN event
+      long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      // OPEN_TXN event is filtered out. So we will just receive one event of 
COMMIT_TXN
+      List<NotificationEvent> hmsEvents = 
eventsProcessor_.getNextMetastoreEvents();
+      assertEquals("COMMIT_TXN", hmsEvents.get(0).getEventType());
+      List<MetastoreEvent> filteredEvents =
+          eventsProcessor_.getEventsFactory().getFilteredEvents(
+              hmsEvents, eventsProcessor_.getMetrics());
+
+      filteredEvents.get(0).process();
+      assertEquals(MetastoreEvent.CLUSTER_WIDE_TARGET,
+          filteredEvents.get(0).getTargetName());
+      eventsProcessor_.start(filteredEvents.get(0).getEventId());
+
+      // 2. COMMIT_TXN event with one non-partitioned table
+      try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1);
+           HiveJdbcClientPool.HiveJdbcClient hiveClient = 
jdbcClientPool.getClient()) {
+        hiveClient.executeSql(insertNonPartTbl);
+      }
+      hmsEvents = eventsProcessor_.getNextMetastoreEvents();
+      assertEquals(3, hmsEvents.size());
+      assertEquals("ALLOC_WRITE_ID_EVENT", hmsEvents.get(0).getEventType());
+      assertEquals("ALTER_TABLE", hmsEvents.get(1).getEventType());
+      assertEquals("COMMIT_TXN", hmsEvents.get(2).getEventType());
+      filteredEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(
+          hmsEvents, eventsProcessor_.getMetrics());
+      filteredEvents.get(0).process();
+      filteredEvents.get(1).process();
+      filteredEvents.get(2).process();
+      assertEquals(TEST_DB_NAME + "." + tblName, 
filteredEvents.get(2).getTargetName());
+      eventsProcessor_.start(filteredEvents.get(2).getEventId());
+
+      // 3. COMMIT_TXN event with one partitioned table
+      try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1);
+           HiveJdbcClientPool.HiveJdbcClient hiveClient = 
jdbcClientPool.getClient()) {
+        hiveClient.executeSql(insertPartTbl);
+      }
+      hmsEvents = eventsProcessor_.getNextMetastoreEvents();
+      assertEquals(3, hmsEvents.size());
+      assertEquals("ALLOC_WRITE_ID_EVENT", hmsEvents.get(0).getEventType());
+      assertEquals("ADD_PARTITION", hmsEvents.get(1).getEventType());
+      assertEquals("COMMIT_TXN", hmsEvents.get(2).getEventType());
+      filteredEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(
+          hmsEvents, eventsProcessor_.getMetrics());
+      filteredEvents.get(0).process();
+      filteredEvents.get(1).process();
+      filteredEvents.get(2).process();
+      assertEquals(TEST_DB_NAME + "." + partTblName,
+          filteredEvents.get(2).getTargetName());
+      eventsProcessor_.start(filteredEvents.get(2).getEventId());
+
+      // 4. COMMIT_TXN event with multiple tables
+      txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      // Insert the non-partitioned table
+      long writeId = MetastoreShim.allocateTableWriteId(client.getHiveClient(),
+          txnId, TEST_DB_NAME, tblName);
+      loadTable(tblName);
+      HdfsTable tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, tblName);
+      simulateInsertIntoTransactionalTableFromFS(
+          tbl.getMetaStoreTable(), null, 1, txnId, writeId);
+      // Insert the partitioned table
+      writeId = MetastoreShim.allocateTableWriteId(client.getHiveClient(),
+          txnId, TEST_DB_NAME, partTblName);
+      loadTable(partTblName);
+      tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, partTblName);
+      Partition partition = client.getHiveClient().getPartition(
+            TEST_DB_NAME, partTblName, Arrays.asList("a"));
+      simulateInsertIntoTransactionalTableFromFS(
+          tbl.getMetaStoreTable(), partition, 1, txnId, writeId);
+      MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      // simulateInsertIntoTransactionalTableFromFS() won't generate ALTER 
events.
+      // So we just have ALLOC_WRITE_ID_EVENT and COMMIT_TXN events.
+      hmsEvents = eventsProcessor_.getNextMetastoreEvents();
+      assertEquals(3, hmsEvents.size());
+      assertEquals("ALLOC_WRITE_ID_EVENT", hmsEvents.get(0).getEventType());
+      assertEquals("ALLOC_WRITE_ID_EVENT", hmsEvents.get(1).getEventType());
+      assertEquals("COMMIT_TXN", hmsEvents.get(2).getEventType());
+      filteredEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(
+          hmsEvents, eventsProcessor_.getMetrics());
+      for (int i = 0; i < 3; i++) {
+        filteredEvents.get(i).process();
+      }
+      assertEquals(
+          TEST_DB_NAME + "." + tblName + "," + TEST_DB_NAME + "." + 
partTblName,
+          filteredEvents.get(2).getTargetName());
+      eventsProcessor_.start(filteredEvents.get(2).getEventId());
+    }
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {

Reply via email to