This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 50926b5d8e941c5cc10fd77d0b4556e3441c41e7
Author: Sai Hemanth Gantasala <saihema...@cloudera.com>
AuthorDate: Thu Feb 29 15:46:45 2024 -0800

    IMPALA-12829: Skip processing transaction events if the table is
    HMS sync disabled.
    
    For transactional tables, the event processor is not skipping abort_txn
    and commit_txn_event if the database/table is HMS sync disabled. This
    processing is unnecessary and helps to improve event processor lag by
    skipping abort_txn, and commit_txn events if the corresponding database
    or transactional tables are HMS sync disabled. The database name and
    table name is present for the Alloc_write_id_event, skipping this event
    is already implemented if HMS sync is disabled. Since dbname and table
    name is not present for the abort_txn and commit_txn events, we need to
    check if HMS sync is disabled on the HMS table property when the table
    object is extracted in the CatalogServiceCatalog#addWriteIdsToTable().
    Also, fixed the partitions and table refreshed metrics for CommitTxn
    event.
    
    Additional Issues discovered during testing:
    1) CatalogServiceCatalog#reloadTableIfExists() didn't verify if the
    current eventId is older than the table's lastSyncEventId which leads to
    unecessary reloading of table for commit txns.
    2) Insert queries from impala didn't update the validWriteIdList for
    transactional tables in the cache, so CommitTxn events triggered by
    insert events are triggering reload on unpartitioned transactional
    tables again while consuming these CommitTxn events. Fixed it by
    updating the validWriteIdList in the cache.
    3) CommitTxn events generated after AlterTable events are leading to
    incorrect results if file metadata reload is skipped in AlterTable
    events. Reason being AlterTable event will update the writeId from
    metastore but doesn't reload filemetadata which yields incorrect
    results. This is fixed in HdfsTable class to not skip filemetadata
    reload if writeId is changed.
    4) Added bigger timeouts in TestEventProcessingWithImpala test class
    to avoid flakiness for the transactional events in the event processor
    caused by catalogd_load_metadata_delay config
    
    Testing:
    - Added end-to-end tests to verify transaction events are skipped.
    
    Change-Id: I5d0ecb3b756755bc04c66a538a9ae6b88011a019
    Reviewed-on: http://gerrit.cloudera.org:8080/21175
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 31 ++++++--
 .../impala/catalog/CatalogServiceCatalog.java      | 59 ++++++++++++++-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 17 ++++-
 .../impala/catalog/events/MetastoreEvents.java     |  2 +-
 tests/custom_cluster/test_events_custom_configs.py | 85 +++++++++++++++++-----
 5 files changed, 162 insertions(+), 32 deletions(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 06bcd4514..e80669919 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -109,6 +109,7 @@ import org.apache.impala.catalog.TableNotLoadedException;
 import org.apache.impala.catalog.TableWriteId;
 import org.apache.impala.catalog.events.MetastoreEvents.DerivedMetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
+import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
@@ -961,6 +962,13 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       for (int i = 0; i < writeIds.size(); i++) {
         Table tbl = (Table) MessageBuilder.getTObj(
             writeEventInfoList.get(i).getTableObj(), Table.class);
+        if (catalog_.isHmsEventSyncDisabled(tbl)) {
+          LOG.debug("Not adding write ids to table {}.{} for event {} " +
+              "since table/db level flag {} is set to true", tbl.getDbName(),
+              tbl.getTableName(), getEventId(),
+              MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
+          continue;
+        }
         TableName tableName = new TableName(tbl.getDbName(), 
tbl.getTableName());
         Partition partition = null;
         if (writeEventInfoList.get(i).getPartitionObj() != null) {
@@ -983,7 +991,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
         addCommittedWriteIdsAndReload(getCatalogOpExecutor(), tbl.getDbName(),
             tbl.getTableName(), tbl.getPartitionKeysSize() > 0,
             MetaStoreUtils.isMaterializedViewTable(tbl), writeIdsForTable, 
partsForTable,
-            getEventId());
+            getEventId(), getMetrics());
       }
     }
 
@@ -1010,12 +1018,17 @@ public class MetastoreShim extends 
Hive3MetastoreShimBase {
 
   public static void addCommittedWriteIdsAndReload(CatalogOpExecutor 
catalogOpExecutor,
       String dbName, String tableName, boolean isPartitioned, boolean 
isMaterializedView,
-      List<Long> writeIds, List<Partition> partitions, long eventId)
+      List<Long> writeIds, List<Partition> partitions, long eventId, Metrics 
metrics)
       throws CatalogException {
     if (isPartitioned && !isMaterializedView) {
       try {
-        
catalogOpExecutor.addCommittedWriteIdsAndReloadPartitionsIfExist(eventId, 
dbName,
-            tableName, writeIds, partitions, "COMMIT_TXN event " + eventId);
+        int numPartsRefreshed =
+            
catalogOpExecutor.addCommittedWriteIdsAndReloadPartitionsIfExist(eventId,
+                dbName, tableName, writeIds, partitions, "COMMIT_TXN event " + 
eventId);
+        if (numPartsRefreshed > 0) {
+          
metrics.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
+              .inc(numPartsRefreshed);
+        }
       } catch (TableNotLoadedException e) {
         LOG.debug("Ignoring reloading since table {}.{} is not loaded", dbName,
             tableName);
@@ -1023,9 +1036,13 @@ public class MetastoreShim extends 
Hive3MetastoreShimBase {
         LOG.debug("Ignoring reloading since table {}.{} is not found", dbName, 
tableName);
       }
     } else {
-      catalogOpExecutor.getCatalog()
+      boolean tableRefresh = catalogOpExecutor.getCatalog()
           .reloadTableIfExists(dbName, tableName, "COMMIT_TXN event " + 
eventId,
-              eventId, /*isSkipFileMetadataReload*/ false);
+              eventId, /*isSkipFileMetadataReload*/ false, writeIds);
+      if (tableRefresh) {
+        LOG.info("Refreshed table {}.{}", dbName, tableName);
+        
metrics.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).inc();
+      }
     }
   }
 
@@ -1077,7 +1094,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
       try {
         addCommittedWriteIdsAndReload(getCatalogOpExecutor(), dbName_, 
tblName_,
             isPartitioned_, isMaterializedView_, writeIdsInEvent_, partitions_,
-            getEventId());
+            getEventId(), getMetrics());
         catalog_.addWriteIdsToTable(getDbName(), getTableName(), getEventId(),
             writeIdsInCatalog_, 
MutableValidWriteIdList.WriteIdStatus.COMMITTED);
       } catch (CatalogException e) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 02257b4c9..9c76e622b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -80,6 +80,8 @@ import 
org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import 
org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
+import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
 import org.apache.impala.catalog.events.MetastoreNotificationFetchException;
@@ -3401,15 +3403,45 @@ public class CatalogServiceCatalog extends Catalog {
    * otherwise.
    */
   public boolean reloadTableIfExists(String dbName, String tblName, String 
reason,
-      long eventId, boolean isSkipFileMetadataReload) throws CatalogException {
+      long eventId, boolean isSkipFileMetadataReload, List<Long> 
committedWriteIds)
+      throws CatalogException {
     try {
       Table table = getTable(dbName, tblName);
       if (table == null || table instanceof IncompleteTable) return false;
-      if (eventId > 0 && eventId <= table.getCreateEventId()) {
-        LOG.debug("Not reloading the table {}.{} for event {} since it is 
recreated at "
-            + "event {}.", dbName, tblName, eventId, table.getCreateEventId());
+      if (eventId > 0 && eventId <= table.getCreateEventId() ||
+          (eventId != -1 && table.getLastSyncedEventId() != -1 &&
+              table.getLastSyncedEventId() >= eventId)) {
+        String message = (eventId > 0 && eventId <= table.getCreateEventId()) ?
+            "recreated at event " + table.getCreateEventId() :
+            "already synced till event id: " + table.getLastSyncedEventId();
+        LOG.debug("Not reloading the table {}.{} for event {} since it is 
{}.", dbName,
+            tblName, eventId, message);
         return false;
       }
+      // Transactional table events using this method should reload the table 
only if the
+      // incoming committedWriteIds are different from the committedWriteIds 
cached in
+      // CatalogD. The if block execution is skipped for non-transactional 
table events.
+      HdfsTable hdfsTable = (HdfsTable) table;
+      ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds();
+      if (previousWriteIdList != null && committedWriteIds != null
+          && !committedWriteIds.isEmpty()) {
+        // get a copy of previous write id list
+        boolean tableNeedsRefresh = false;
+        previousWriteIdList = MetastoreShim.getValidWriteIdListFromString(
+            previousWriteIdList.toString());
+        for (Long writeId : committedWriteIds) {
+          if (!previousWriteIdList.isWriteIdValid(writeId)) {
+            tableNeedsRefresh = true;
+            break;
+          }
+        }
+        if (!tableNeedsRefresh) {
+          LOG.info("Not reloading table {} for event {} since the cache is "
+              + "already up-to-date", table.getFullName(), eventId);
+          hdfsTable.setLastSyncedEventId(eventId);
+          return false;
+        }
+      }
       reloadTable(table, reason, eventId, isSkipFileMetadataReload,
           NoOpEventSequence.INSTANCE);
     } catch (DatabaseNotFoundException | TableLoadingException e) {
@@ -4748,6 +4780,12 @@ public class CatalogServiceCatalog extends Catalog {
           "since it is not HdfsTable", dbName, tblName, eventId);
       return;
     }
+    if (isHmsEventSyncDisabled(tbl.getMetaStoreTable())) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} " +
+          "since table/db level flag {} is set to true", dbName, tblName, 
eventId,
+          MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
+      return;
+    }
     if (eventId > 0 && eventId <= tbl.getCreateEventId()) {
       LOG.debug("Not adding write ids to table {}.{} for event {} since it is 
recreated.",
           dbName, tblName, eventId);
@@ -4804,6 +4842,19 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  public boolean 
isHmsEventSyncDisabled(org.apache.hadoop.hive.metastore.api.Table tbl) {
+    if (tbl == null) {
+      return false;
+    }
+    Boolean tblFlagVal = MetastoreTableEvent.getHmsSyncProperty(tbl);
+    if (tblFlagVal != null) {
+      return tblFlagVal;
+    }
+    String dbFlagVal = getDbProperty(tbl.getDbName(),
+        MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey());
+    return Boolean.parseBoolean(dbFlagVal);
+  }
+
   /**
    * This method checks if the version lock is unlocked. If it's still locked 
then it
    * logs an error and unlocks it.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 77110117b..6c8e93e35 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1271,7 +1271,11 @@ public class HdfsTable extends Table implements 
FeFsTable {
           loadConstraintsInfo(msClient, msTbl);
           catalogTimeline.markEvent("Loaded table schema");
         }
-        loadValidWriteIdList(msClient);
+        boolean prevWriteIdChanged = loadValidWriteIdList(msClient);
+        if (prevWriteIdChanged && !loadParams.isLoadPartitionFileMetadata()) {
+          LOG.info("Not skipping file metadata reload since writeId is changed 
in the " +
+              "metastore for the table: " + getFullName());
+        }
         // Set table-level stats first so partition stats can inherit it.
         setTableStats(msTbl);
         // Load partition and file metadata
@@ -1284,7 +1288,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
           }
           storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
           if (msTbl.getPartitionKeysSize() == 0) {
-            if (loadParams.isLoadPartitionFileMetadata()) {
+            if (loadParams.isLoadPartitionFileMetadata() || 
prevWriteIdChanged) {
               storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(
                   msClient, loadParams.getDebugAction(),
                   catalogTimeline);
@@ -2984,7 +2988,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
     if (MetastoreShim.getMajorVersion() > 2 &&
         AcidUtils.isTransactionalTable(msTable_.getParameters())) {
       ValidWriteIdList writeIdList = fetchValidWriteIds(client);
-      prevWriteIdChanged = writeIdList.toString().equals(validWriteIds_);
+      if (validWriteIds_ != null && writeIdList != null) {
+        prevWriteIdChanged = !writeIdList.toString().equals(
+            validWriteIds_.writeToString());
+        if (prevWriteIdChanged) {
+          LOG.info("Valid writeId changed from {} to {}",
+              validWriteIds_.writeToString(), writeIdList.toString());
+        }
+      }
       validWriteIds_ = new MutableValidReaderWriteIdList(writeIdList);
     } else {
       validWriteIds_ = null;
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index b5a8ce139..2405e91f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1113,7 +1113,7 @@ public class MetastoreEvents {
         throws CatalogException {
       try {
         if (!catalog_.reloadTableIfExists(dbName_, tblName_, getEventDesc(), 
getEventId(),
-            skipFileMetadataReload_)) {
+            skipFileMetadataReload_, null)) {
           debugLog("Automatic refresh on table {} failed as the table "
                   + "either does not exist anymore or is not in loaded state.",
               getFullyQualifiedTblName());
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 17c56dd33..5d3b11b26 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -191,30 +191,31 @@ class 
TestEventProcessingCustomConfigsBase(CustomClusterTestSuite):
         # contains the related data
         "create table {0}.{1} (c1 int) {2}".format(db_name,
           acid_no_part_tbl_name, acid_props),
-        "insert into table {0}.{1} values (1) ".format(db_name, 
acid_no_part_tbl_name),
-        "insert overwrite table {0}.{1} select * from {0}.{1}".format(
-          db_name, acid_no_part_tbl_name),
+        # TODO: IMPALA-14305 will fix the transactional self-events detection
+        # "insert into table {0}.{1} values (1) ".format(db_name, 
acid_no_part_tbl_name),
+        # "insert overwrite table {0}.{1} select * from {0}.{1}".format(
+        #   db_name, acid_no_part_tbl_name),
         "{0} {1}.{2}".format(TRUNCATE_TBL_STMT, db_name, 
acid_no_part_tbl_name),
         # the table is empty so the following insert adds 0 rows
-        "insert overwrite table {0}.{1} select * from {0}.{1}".format(
-          db_name, acid_no_part_tbl_name),
+        # "insert overwrite table {0}.{1} select * from {0}.{1}".format(
+        #   db_name, acid_no_part_tbl_name),
         "create table {0}.{1} (c1 int) partitioned by (part int) 
{2}".format(db_name,
           acid_tbl_name, acid_props),
-        "insert into table {0}.{1} partition (part=1) "
-        "values (1) ".format(db_name, acid_tbl_name),
-        "insert into table {0}.{1} partition (part) select id, int_col "
-        "from functional.alltypestiny".format(db_name, acid_tbl_name),
+        # "insert into table {0}.{1} partition (part=1) "
+        # "values (1) ".format(db_name, acid_tbl_name),
+        # "insert into table {0}.{1} partition (part) select id, int_col "
+        # "from functional.alltypestiny".format(db_name, acid_tbl_name),
         # repeat the same insert, now it writes to existing partitions
-        "insert into table {0}.{1} partition (part) select id, int_col "
-        "from functional.alltypestiny".format(db_name, acid_tbl_name),
+        # "insert into table {0}.{1} partition (part) select id, int_col "
+        # "from functional.alltypestiny".format(db_name, acid_tbl_name),
         # following insert overwrite is used instead of truncate, because 
truncate
         # leads to a non-self event that reloads the table
-        "insert overwrite table {0}.{1} partition (part) select id, int_col "
-        "from functional.alltypestiny where id=-1".format(db_name, 
acid_tbl_name),
-        "insert overwrite table {0}.{1} partition (part) select id, int_col "
-        "from functional.alltypestiny".format(db_name, acid_tbl_name),
-        "insert overwrite {0}.{1} partition(part) select * from 
{0}.{1}".format(
-          db_name, acid_tbl_name),
+        # "insert overwrite table {0}.{1} partition (part) select id, int_col "
+        # "from functional.alltypestiny where id=-1".format(db_name, 
acid_tbl_name),
+        # "insert overwrite table {0}.{1} partition (part) select id, int_col "
+        # "from functional.alltypestiny".format(db_name, acid_tbl_name),
+        # "insert overwrite {0}.{1} partition(part) select * from 
{0}.{1}".format(
+        #   db_name, acid_tbl_name),
         # recover partitions will generate add_partition events
         "alter table {0}.{1} recover partitions".format(db_name, 
recover_tbl_name),
         # events processor doesn't process delete column stats events 
currently,
@@ -1643,6 +1644,56 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       assert curr_drop_table_metric == prev_drop_table_metric + 1
       assert curr_create_table_metric == prev_create_table_metric + 1
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1")
+  def test_hms_sync_disabled_txn_events(self, unique_database):
+    """Verify IMPALA-12829: skip txn events if db/table has disabled applying 
HMS
+    events"""
+    # Iterate through different levels at which HMS sync can be disabled
+    for disable_level in ['table', 'database']:
+      for partitioned in [False, True]:
+        event_id_before = EventProcessorUtils.get_last_synced_event_id()
+        txn_table = "part_tbl" if partitioned else "tbl"
+        full_tbl_name = unique_database + '.' + txn_table
+        part_create = " partitioned by (year int)" if partitioned else ""
+        part_insert = " partition (year=2025)" if partitioned else ""
+
+        create_stmt = "create transactional table {} (i 
int){}".format(full_tbl_name,
+            part_create)
+        self.run_stmt_in_hive(create_stmt)
+        EventProcessorUtils.wait_for_event_processing(self)
+        if disable_level == 'database':
+          self.run_stmt_in_hive(
+            """ALTER DATABASE {} SET DBPROPERTIES 
('impala.disableHmsSync'='true')"""
+            .format(unique_database))
+        else:
+          self.run_stmt_in_hive(
+            """ALTER TABLE {} SET TBLPROPERTIES 
('impala.disableHmsSync'='true')"""
+            .format(full_tbl_name))
+        self.client.execute("select * from {}".format(full_tbl_name))
+        EventProcessorUtils.wait_for_event_processing(self)
+        tbls_refreshed_before = 
EventProcessorUtils.get_int_metric('tables-refreshed', 0)
+        partitions_refreshed_before = EventProcessorUtils.get_int_metric(
+          'partitions-refreshed', 0)
+        self.client.execute(":event_processor('pause')")
+        # commit txn event from insert operation
+        if partitioned:
+          self.client.execute("ALTER TABLE {} ADD PARTITION(year=2025)".format(
+            full_tbl_name))
+        self.run_stmt_in_hive("insert into {} {} values (1),(2),(3)"
+          .format(full_tbl_name, part_insert))
+        self.client.execute(":event_processor('start')")
+        EventProcessorUtils.wait_for_event_processing(self)
+        tbls_refreshed_after = 
EventProcessorUtils.get_int_metric('tables-refreshed', 0)
+        partitions_refreshed_after = EventProcessorUtils.get_int_metric(
+          'partitions-refreshed', 0)
+        assert tbls_refreshed_after == tbls_refreshed_before
+        assert partitions_refreshed_after == partitions_refreshed_before
+        assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
+        self.client.execute("""DROP DATABASE {} 
CASCADE""".format(unique_database))
+        self.client.execute("""CREATE DATABASE {}""".format(unique_database))
+
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):

Reply via email to