This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a99ccdace IMPALA-14230: Add catch-up mode for event processing
a99ccdace is described below

commit a99ccdacea7860af4dececc7d1f7560a15ccaf9e
Author: Yida Wu <[email protected]>
AuthorDate: Tue Feb 3 01:08:04 2026 -0800

    IMPALA-14230: Add catch-up mode for event processing
    
    When HMS events fall far behind (e.g. during HA failover), event
    handling can become a serious bottleneck especially in the single
    threaded legacy mode, heavy operations such as table reload could
    take minutes that blocks the whole event processing.
    
    This patch introduces a configurable catch-up mode. If event lag
    exceeds the new added flag hms_event_catchup_threshold_s
    (default 1800s), the event processor switches to a faster way by
    invalidating the table instead of performing every event to avoid
    unnecessary heavy table reload to speed up the event processing.
    
    Catch-up mode is applied only to table events that may trigger heavy
    operations, like reloading tables or operations that may involve
    severe and slow lock contention. Lightweight events such as
    create/drop table and database events are excluded since they are
    already fast.
    
    Testing:
    Added test_catchup_mode_* tests in TestEventProcessingCatchupMode.
    Passed exhaustive tests.
    
    Change-Id: Ib906c06346d5d3159999eeac632e1318bc060065
    Reviewed-on: http://gerrit.cloudera.org:8080/23942
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |   7 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/compat/MetastoreShim.java    |  10 +
 .../impala/catalog/CatalogServiceCatalog.java      |  19 +-
 .../impala/catalog/events/MetastoreEvents.java     |  85 ++++++++
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 tests/custom_cluster/test_events_custom_configs.py | 229 +++++++++++++++++++++
 8 files changed, 355 insertions(+), 3 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 077ec22cb..3f717c8fd 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -117,6 +117,13 @@ DEFINE_int32_hidden(hms_event_sync_sleep_interval_ms, 100, 
"Sleep interval (in m
      "used in the thread of catalogd processing the WaitForHmsEvent RPC. The 
thread "
      "sleeps for such an interval when checking for HMS events to be synced.");
 
+DEFINE_int32(hms_event_catchup_threshold_s, 1800,
+    "Maximum lag time (in seconds) allowed for HMS events before the event 
processor "
+    "switches to 'catch-up' mode. In this mode, some table-level events 
trigger a "
+    "lightweight table invalidation to avoid heavier operations, like table 
reload, "
+    "to speed up the event processing. "
+    "A value of 0 or less disables this feature.");
+
 DECLARE_string(debug_actions);
 DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a 
HMS "
     "server at a port specified by hms_port flag");
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index f806e6f49..e6fdf5b23 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -136,6 +136,7 @@ DECLARE_int32(dbcp_data_source_idle_timeout_s);
 DECLARE_bool(enable_catalogd_ha);
 DECLARE_string(injected_group_members_debug_only);
 DECLARE_int32(hms_event_sync_sleep_interval_ms);
+DECLARE_int32(hms_event_catchup_threshold_s);
 DECLARE_int32(catalog_delete_log_ttl);
 DECLARE_bool(enable_hierarchical_event_processing);
 DECLARE_int32(num_db_event_executors);
@@ -571,6 +572,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
 #endif
   cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
   
cfg.__set_hms_event_sync_sleep_interval_ms(FLAGS_hms_event_sync_sleep_interval_ms);
+  cfg.__set_hms_event_catchup_threshold_s(FLAGS_hms_event_catchup_threshold_s);
   cfg.__set_catalog_delete_log_ttl(FLAGS_catalog_delete_log_ttl);
   cfg.__set_enable_hierarchical_event_processing(
       FLAGS_enable_hierarchical_event_processing);
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index d43ec8b7e..afc8f5320 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -363,4 +363,6 @@ struct TBackendGflags {
   166: required i32 max_stmt_metadata_loader_threads
 
   167: required bool disable_hms_sync_by_default
+
+  168: required i32 hms_event_catchup_threshold_s
 }
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 9931e3aa2..4b132ee5c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -34,6 +34,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -951,6 +952,13 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
           txnId_);
     }
 
+    @Override
+    protected Collection<TableName> getTableNames() {
+      return tableWriteIds_.stream()
+          .map(writeId -> new TableName(writeId.getDbName(), 
writeId.getTblName()))
+          .collect(Collectors.toSet());
+    }
+
     @Override
     public String getTargetName() {
       if (tableNames_.isEmpty()) return CLUSTER_WIDE_TARGET;
@@ -959,6 +967,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
 
     @Override
     protected void process() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       // Via getAllWriteEventInfo, we can get data insertion info for 
transactional tables
       // even though there are no insert events generated for transactional 
tables. Note
       // that we cannot get DDL info from this API.
@@ -1156,6 +1165,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
 
     @Override
     protected void processTableEvent() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       try {
         addCommittedWriteIdsAndReload(getCatalogOpExecutor(), dbName_, 
tblName_,
             isPartitioned_, isMaterializedView_, writeIdsInEvent_, partitions_,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index f3402d2d3..41599f3c0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3418,16 +3418,20 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Invalidate the table if it exists by overwriting existing entry by a 
Incomplete
-   * Table.
-   * @return null if the table does not exist else return the invalidated table
+   * Table. If skipIfInvalidated true, the invalidation is skipped if the 
table is
+   * already an instance of IncompleteTable.
+   * @return null if the table does not exist or skipIfInvalidated is on and 
table is
+   * invalidated else return the invalidated table
    */
-  public @Nullable Table invalidateTableIfExists(String dbName, String 
tblName) {
+  public @Nullable Table invalidateTableIfExists(
+      String dbName, String tblName, boolean skipIfInvalidated) {
     Table incompleteTable;
     try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(dbName)) {
       Db db = result.getDb();
       if (db == null) return null;
       Table existingTbl = db.getTable(tblName);
       if (existingTbl == null) return null;
+      if (skipIfInvalidated && existingTbl instanceof IncompleteTable) return 
null;
       incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
           existingTbl.getTableType(), existingTbl.getTableComment(),
           existingTbl.getCreateEventId());
@@ -3438,6 +3442,15 @@ public class CatalogServiceCatalog extends Catalog {
     return incompleteTable;
   }
 
+  /**
+   * Invalidate the table if it exists by overwriting existing entry by a 
Incomplete
+   * Table.
+   * @return null if the table does not exist else return the invalidated table
+   */
+  public @Nullable Table invalidateTableIfExists(String dbName, String 
tblName) {
+    return invalidateTableIfExists(dbName, tblName, false /* skipIfInvalidated 
*/);
+  }
+
   private void scheduleTableLoading(String dbName, String tableName) {
     TTableName tTblName = new TTableName(dbName.toLowerCase(), 
tableName.toLowerCase());
     TTableName tDbName = new TTableName(dbName.toLowerCase(), "*");
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 5b67b3274..fa93f1d54 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1058,6 +1059,60 @@ public class MetastoreEvents {
       }
       return false;
     }
+
+    /**
+     * Checks if the event lag exceeds the configured threshold.
+     * @return the lag value if the event is lagging and should enter catch-up 
mode.
+     * @return 0 if no lagging.
+     */
+    protected long evaluateCatchUpLag() {
+      int threshold = BackendConfig.INSTANCE.getHmsEventCatchUpThreshold();
+      if (threshold <= 0) return 0;
+      long lag = (System.currentTimeMillis() / 1000) - getEventTime();
+      return lag > threshold ? lag : 0;
+    }
+
+    /**
+     * Checks whether the event is in catch-up mode by comparing the event lag 
with the
+     * configured threshold. If in catch-up mode, tries to invalidate the 
table and
+     * returns true to notify the caller to skip the event. Otherwise, returns 
false.
+     * Note: This should be evaluated before isSelfEvent() and isOlderEvent() 
to avoid
+     * hitting table locks, which can get stuck waiting for a lock 
(IMPALA-12461).
+     */
+    protected boolean handleIfInCatchUpMode() {
+      long lag = evaluateCatchUpLag();
+      if (lag > 0) {
+        // It is okay if the table does not exist and can't be invalidated, 
the event is
+        // skipped anyway.
+        infoLog("Catch-up Mode: Skipping event due to event lag of {}s", lag);
+        Collection<TableName> tables = getTableNames();
+        Preconditions.checkNotNull(tables, "getTableNames() must return 
non-null");
+        for (TableName tableName : tables) {
+          if (catalog_.invalidateTableIfExists(
+                  tableName.getDb(), tableName.getTbl(), true)
+              != null) {
+            infoLog("Catch-up Mode: Invalidated table {}.{} due to event lag 
of {}s",
+                tableName.getDb(), tableName.getTbl(), lag);
+          }
+        }
+
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        debugLog("Incremented skipped metric to "
+            + 
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                  .getCount());
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Overridden by subclasses to invalidate specific tables during catch-up 
mode.
+     * Must return non-null.
+     */
+    protected Collection<TableName> getTableNames() {
+      throw new UnsupportedOperationException(
+          "getTableNames() is not supported for event type: " + 
getEventType());
+    }
   }
 
   /**
@@ -1221,6 +1276,11 @@ public class MetastoreEvents {
     // the partition object from the events which are batched together.
     protected Partition getPartitionForBatching() { return null; }
 
+    @Override
+    protected Collection<TableName> getTableNames() {
+      return Collections.singletonList(new TableName(dbName_, tblName_));
+    }
+
     protected MetastoreTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics 
metrics,
         NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
@@ -1938,6 +1998,9 @@ public class MetastoreEvents {
 
     @Override
     public void processTableEvent() throws MetastoreNotificationException {
+      // Evaluate catch-up mode before isSelfEvent() and isOlderEvent() to 
avoid
+      // hitting table locks, which can get stuck waiting for a lock.
+      if (handleIfInCatchUpMode()) return;
       if (isSelfEvent()) {
         infoLog("Not processing the insert event as it is a self-event");
         return;
@@ -2129,6 +2192,8 @@ public class MetastoreEvents {
         return;
       }
 
+      if (handleIfInCatchUpMode()) return;
+
       // Determine whether this is an event which we have already seen or if 
it is a new
       // event
       if (isSelfEvent()) {
@@ -2869,6 +2934,7 @@ public class MetastoreEvents {
         infoLog("Partition list is empty. Ignoring this event.");
         return;
       }
+      if (handleIfInCatchUpMode()) return;
       try {
         // Reload the whole table if it's a transactional table and incremental
         // refresh is not enabled. Materialized views are treated as a special 
case
@@ -3013,6 +3079,7 @@ public class MetastoreEvents {
     @Override
     public void processTableEvent() throws MetastoreNotificationException,
         CatalogException {
+      if (handleIfInCatchUpMode()) return;
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -3099,6 +3166,7 @@ public class MetastoreEvents {
             "received in the event.", getEventId());
         return;
       }
+      if (handleIfInCatchUpMode()) return;
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -3199,6 +3267,7 @@ public class MetastoreEvents {
     @Override
     protected void processTableEvent() throws MetastoreNotificationException,
         CatalogException {
+      if (handleIfInCatchUpMode()) return;
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -3358,6 +3427,7 @@ public class MetastoreEvents {
         infoLog("Partition list is empty. Ignoring this event.");
         return;
       }
+      if (handleIfInCatchUpMode()) return;
       try {
         // Reload the whole table if it's a transactional table or 
materialized view.
         // Materialized views are treated as a special case because it's 
possible to
@@ -3434,6 +3504,7 @@ public class MetastoreEvents {
             getFullyQualifiedTblName());
         return;
       }
+      if (handleIfInCatchUpMode()) return;
       try {
         List<Long> writeIds = txnToWriteIdList_.stream()
             .map(TxnToWriteId::getWriteId)
@@ -3516,6 +3587,7 @@ public class MetastoreEvents {
 
     @Override
     public void processTableEvent() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -3692,8 +3764,19 @@ public class MetastoreEvents {
       return tableWriteIds_;
     }
 
+    @Override
+    protected Collection<TableName> getTableNames() {
+      if (tableWriteIds_ != null) {
+        return tableWriteIds_.stream()
+            .map(writeId -> new TableName(writeId.getDbName(), 
writeId.getTblName()))
+            .collect(Collectors.toSet());
+      }
+      return Collections.emptyList();
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       try {
         infoLog("Adding {} aborted write ids for txn id {}", 
tableWriteIds_.size(),
             txnId_);
@@ -3774,6 +3857,7 @@ public class MetastoreEvents {
 
     @Override
     protected void processTableEvent() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       try {
         catalog_.addWriteIdsToTable(getDbName(), getTableName(), getEventId(), 
writeIds_,
             MutableValidWriteIdList.WriteIdStatus.ABORTED);
@@ -3838,6 +3922,7 @@ public class MetastoreEvents {
 
     @Override
     protected void processTableEvent() throws MetastoreNotificationException {
+      if (handleIfInCatchUpMode()) return;
       try {
         if (partitionName_ == null) {
           boolean notSkipped = reloadTableFromCatalog(true);
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index efd4b6383..b6934e666 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -349,6 +349,10 @@ public class BackendConfig {
     return backendCfg_.hms_event_incremental_refresh_transactional_table;
   }
 
+  public int getHmsEventCatchUpThreshold() {
+    return backendCfg_.hms_event_catchup_threshold_s;
+  }
+
   public boolean isAutoCheckCompaction() {
     return backendCfg_.auto_check_compaction;
   }
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 93099e934..55f7b2ea2 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -2083,3 +2083,232 @@ class 
TestEventSyncWaiting(TestEventProcessingCustomConfigsBase):
         "alter table {} partition(p=0) compact 'minor' and wait".format(tbl))
     res = self.execute_query_expect_success(client, "show files in " + tbl)
     assert len(res.data) == 1
+
+
[email protected]
[email protected]_args(
+    catalogd_args="--hms_event_polling_interval_s=1 "
+                  "--hms_event_catchup_threshold_s=2 "
+                  "--enable_hierarchical_event_processing=false "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@3500",
+    disable_log_buffering=True, cluster_size=1)
+class TestEventProcessingCatchupMode(TestEventProcessingCustomConfigsBase):
+  """
+  Tests for the event processor catch-up mode.
+  The cluster is configured with a 2s catch-up threshold, and a 3.5s delay is
+  injected in event processing to trigger catch-up mode for the tests.
+  """
+
+  @pytest.mark.execute_serially
+  def test_catchup_mode_alter_table(self, unique_database):
+    """
+    Tests that the event processor switches to catch-up mode to invalidate the 
table
+    and skip the alter table events when the event lag exceeds the configured 
threshold.
+    """
+    tbl = unique_database + ".catchup_test_tbl"
+    self.client.execute("create table {} (i int) partitioned by (p 
int)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("alter table {} add partition (p=1)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("alter table {} add partition (p=2)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("alter table {} add partition (p=3)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    # Verify the logs.
+    log_regex = r"Invalidated table {} due to event lag of .*s".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=3, 
timeout_s=20)
+    log_regex =\
+      r"{}.* Catch-up Mode: Skipping event .*s".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=3, 
timeout_s=20)
+
+    # Verify consistency.
+    res = self.client.execute("show partitions {}".format(tbl))
+    assert len(res.data) == 4
+    assert 'p=1' in res.get_data()
+    assert 'p=2' in res.get_data()
+    assert 'p=3' in res.get_data()
+
+  @pytest.mark.execute_serially
+  def test_catchup_mode_partition_events(self, unique_database):
+    """
+    Tests that various event types (INSERT, ADD_PARTITION, DROP_PARTITION)
+    trigger catch-up mode when the event processor is lagging.
+    """
+    tbl = unique_database + ".catchup_partition_events_tbl"
+    self.client.execute("create table {} (i int) partitioned by (p 
int)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("alter table {} add partition (p=1)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    # Verify ADD_PARTITION logs.
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: ADD_PARTITION .* {}.* Catch-up Mode: 
Skipping".format(tbl),
+        expected_count=1, timeout_s=20)
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: ADD_PARTITION .* Invalidated table {}".format(tbl),
+        expected_count=1, timeout_s=20)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("insert into {} partition (p=1) values 
(100)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    # Verify INSERT logs.
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: INSERT .* {}.* Catch-up Mode: Skipping".format(tbl),
+        expected_count=1, timeout_s=20)
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: INSERT .* Invalidated table {}".format(tbl),
+        expected_count=1, timeout_s=20)
+    self.client.execute("show partitions {}".format(tbl))
+    self.run_stmt_in_hive("alter table {} drop partition (p=1)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    # Verify DROP_PARTITION logs.
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: DROP_PARTITION .* {}.* Catch-up Mode: 
Skipping".format(tbl),
+        expected_count=1, timeout_s=20)
+    self.assert_catalogd_log_contains("INFO",
+        r"EventType: DROP_PARTITION .* Invalidated table {}".format(tbl),
+        expected_count=1, timeout_s=20)
+    EventProcessorUtils.wait_for_event_processing(self)
+    res = self.client.execute("show partitions {}".format(tbl))
+    assert 'p=1' not in res.get_data()
+
+  def _run_catchup_mode_txn_test(self, unique_database, is_commit, ev_type):
+    tbl1_name = "catchup_txn_tbl1_" + ev_type.lower()
+    tbl2_name = "catchup_txn_tbl2_" + ev_type.lower()
+    tbl1 = unique_database + "." + tbl1_name
+    tbl2 = unique_database + "." + tbl2_name
+
+    # Create multiple transactional tables for testing.
+    self.client.execute(
+        "create table {} (i int) "
+        "tblproperties ('transactional'='true', "
+        "'transactional_properties'='insert_only')".format(tbl1))
+    self.client.execute(
+        "create table {} (i int) "
+        "tblproperties ('transactional'='true', "
+        "'transactional_properties'='insert_only')".format(tbl2))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    # Use the AcidTxn to manually control the transaction to allow us load the 
tables
+    # before the commit or abort txn event for invalidating the tables by this 
event.
+    acid = AcidTxn(self.hive_client)
+    txn_id = acid.open_txns()
+
+    # Triggers ALLOC_WRITE_ID events for later commit or abort txn.
+    acid.allocate_table_write_ids(txn_id, unique_database, tbl1_name)
+    acid.allocate_table_write_ids(txn_id, unique_database, tbl2_name)
+    EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+
+    # Load the tables for commit or abort txn event having something to 
invalidate.
+    self.client.execute("describe {}".format(tbl1))
+    self.client.execute("describe {}".format(tbl2))
+
+    # Triggers commit or abort txn to invalidate tables in catch-up mode.
+    events_skipped_before = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    if is_commit:
+      acid.commit_txn(txn_id)
+    else:
+      acid.abort_txn(txn_id)
+    EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+    events_skipped_after = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    assert events_skipped_after == events_skipped_before + 1
+
+    # Verify the logs.
+    log_regex1 =\
+      r"{} .* Invalidated table {} due to event lag of .*s".format(ev_type, 
tbl1)
+    self.assert_catalogd_log_contains("INFO", log_regex1, expected_count=1, 
timeout_s=20)
+    log_regex2 =\
+      r"{} .* Invalidated table {} due to event lag of .*s".format(ev_type, 
tbl2)
+    self.assert_catalogd_log_contains("INFO", log_regex2, expected_count=1, 
timeout_s=20)
+
+  @pytest.mark.execute_serially
+  def test_catchup_mode_commit_txn(self, unique_database):
+    """
+    Tests that CommitTxn events cause the event processor to switch to 
catch-up mode
+    to invalidate multiple involved tables when the processing delay exceeds 
the
+    configured threshold.
+    """
+    self._run_catchup_mode_txn_test(
+      unique_database, is_commit=True, ev_type='COMMIT_TXN')
+
+  @pytest.mark.execute_serially
+  def test_catchup_mode_abort_txn(self, unique_database):
+    """
+    Tests that AbortTxn events cause the event processor to switch to catch-up 
mode
+    to invalidate multiple involved tables when the processing delay exceeds 
the
+    configured threshold.
+    """
+    self._run_catchup_mode_txn_test(
+      unique_database, is_commit=False, ev_type='ABORT_TXN')
+
+  @pytest.mark.execute_serially
+  def test_catchup_mode_iceberg(self, unique_database):
+    """
+    Tests that iceberg ingestion generates ALTER_TABLE events, when triggering
+    rapid insertions from Hive, it can trigger catch-up mode.
+    """
+    tbl_name = "catchup_mode_iceberg_tbl"
+    tbl = unique_database + "." + tbl_name
+
+    # Create Iceberg table.
+    self.client.execute(
+        "create table {} (i int, s string, p int) "
+        "partitioned by spec(p) stored as iceberg".format(tbl))
+    self.client.execute("insert into {} values (0, 'aaa', 0)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("describe {}".format(tbl))
+
+    # Trigger hive insertions to have external ALTER_TABLE events rapidly.
+    self.run_stmt_in_hive(
+        "insert into {0} values (1, 'bbb', 1); "
+        "insert into {0} values (2, 'ccc', 2); "
+        "insert into {0} values (3, 'ddd', 3); "
+        "insert into {0} values (4, 'eee', 4); "
+        "insert into {0} values (5, 'fff', 5);".format(tbl)
+    )
+    EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+
+    # Verify logs.
+    log_regex = r"ALTER_TABLE .*{}.* Catch-up Mode: Skipping".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=6, 
timeout_s=20)
+    log_regex =\
+      r"ALTER_TABLE .* Invalidated table {} due to event lag of 
.*s".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=2, 
timeout_s=20)
+
+    # Verify consistency.
+    res = self.client.execute("select count(*) from {}".format(tbl))
+    assert '6' in res.get_data()
+
+
[email protected]
+class 
TestEventProcessingCatchupModeCustom(TestEventProcessingCustomConfigsBase):
+
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--hms_event_polling_interval_s=1 "
+                    "--hms_event_catchup_threshold_s=2 "
+                    "--enable_hierarchical_event_processing=false",
+      disable_log_buffering=True, cluster_size=1)
+  def test_catchup_mode_compute_stats(self, unique_database):
+    """
+    Tests ALTER_TABLE events lagging generated by a slow COMPUTE STATS.
+    """
+    tbl = unique_database + ".catchup_stats_tbl"
+    self.client.execute("create table {} (i int) partitioned by (p 
int)".format(tbl))
+    self.client.execute("insert into {} partition (p=1) values 
(1)".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    self.client.execute("describe {}".format(tbl))
+
+    # Trigger COMPUTE STATS to generate ALTER_TABLE events.
+    # The catalogd_update_stats_delay causes at least 3.5s event lag.
+    self.client.execute("set 
debug_action='catalogd_update_stats_delay:SLEEP@3500'")
+    self.client.execute("compute stats {}".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    # Verify the logs.
+    log_regex = r"ALTER_TABLE.*Invalidated table {} due to event 
lag".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=1, 
timeout_s=20)
+    log_regex = r"ALTER_TABLE.*{}.* Catch-up Mode: Skipping".format(tbl)
+    self.assert_catalogd_log_contains("INFO", log_regex, expected_count=1, 
timeout_s=20)

Reply via email to