This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a99ccdace IMPALA-14230: Add catch-up mode for event processing
a99ccdace is described below
commit a99ccdacea7860af4dececc7d1f7560a15ccaf9e
Author: Yida Wu <[email protected]>
AuthorDate: Tue Feb 3 01:08:04 2026 -0800
IMPALA-14230: Add catch-up mode for event processing
When HMS events fall far behind (e.g. during HA failover), event
handling can become a serious bottleneck especially in the single
threaded legacy mode, heavy operations such as table reload could
take minutes that blocks the whole event processing.
This patch introduces a configurable catch-up mode. If event lag
exceeds the new added flag hms_event_catchup_threshold_s
(default 1800s), the event processor switches to a faster way by
invalidating the table instead of performing every event to avoid
unnecessary heavy table reload to speed up the event processing.
Catch-up mode is applied only to table events that may trigger heavy
operations, like reloading tables or operations that may involve
severe and slow lock contention. Lightweight events such as
create/drop table and database events are excluded since they are
already fast.
Testing:
Added test_catchup_mode_* tests in TestEventProcessingCatchupMode.
Passed exhaustive tests.
Change-Id: Ib906c06346d5d3159999eeac632e1318bc060065
Reviewed-on: http://gerrit.cloudera.org:8080/23942
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/catalog/catalog-server.cc | 7 +
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/compat/MetastoreShim.java | 10 +
.../impala/catalog/CatalogServiceCatalog.java | 19 +-
.../impala/catalog/events/MetastoreEvents.java | 85 ++++++++
.../org/apache/impala/service/BackendConfig.java | 4 +
tests/custom_cluster/test_events_custom_configs.py | 229 +++++++++++++++++++++
8 files changed, 355 insertions(+), 3 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 077ec22cb..3f717c8fd 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -117,6 +117,13 @@ DEFINE_int32_hidden(hms_event_sync_sleep_interval_ms, 100,
"Sleep interval (in m
"used in the thread of catalogd processing the WaitForHmsEvent RPC. The
thread "
"sleeps for such an interval when checking for HMS events to be synced.");
+DEFINE_int32(hms_event_catchup_threshold_s, 1800,
+ "Maximum lag time (in seconds) allowed for HMS events before the event
processor "
+ "switches to 'catch-up' mode. In this mode, some table-level events
trigger a "
+ "lightweight table invalidation to avoid heavier operations, like table
reload, "
+ "to speed up the event processing. "
+ "A value of 0 or less disables this feature.");
+
DECLARE_string(debug_actions);
DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a
HMS "
"server at a port specified by hms_port flag");
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index f806e6f49..e6fdf5b23 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -136,6 +136,7 @@ DECLARE_int32(dbcp_data_source_idle_timeout_s);
DECLARE_bool(enable_catalogd_ha);
DECLARE_string(injected_group_members_debug_only);
DECLARE_int32(hms_event_sync_sleep_interval_ms);
+DECLARE_int32(hms_event_catchup_threshold_s);
DECLARE_int32(catalog_delete_log_ttl);
DECLARE_bool(enable_hierarchical_event_processing);
DECLARE_int32(num_db_event_executors);
@@ -571,6 +572,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
#endif
cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
cfg.__set_hms_event_sync_sleep_interval_ms(FLAGS_hms_event_sync_sleep_interval_ms);
+ cfg.__set_hms_event_catchup_threshold_s(FLAGS_hms_event_catchup_threshold_s);
cfg.__set_catalog_delete_log_ttl(FLAGS_catalog_delete_log_ttl);
cfg.__set_enable_hierarchical_event_processing(
FLAGS_enable_hierarchical_event_processing);
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index d43ec8b7e..afc8f5320 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -363,4 +363,6 @@ struct TBackendGflags {
166: required i32 max_stmt_metadata_loader_threads
167: required bool disable_hms_sync_by_default
+
+ 168: required i32 hms_event_catchup_threshold_s
}
diff --git
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 9931e3aa2..4b132ee5c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -34,6 +34,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -951,6 +952,13 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
txnId_);
}
+ @Override
+ protected Collection<TableName> getTableNames() {
+ return tableWriteIds_.stream()
+ .map(writeId -> new TableName(writeId.getDbName(),
writeId.getTblName()))
+ .collect(Collectors.toSet());
+ }
+
@Override
public String getTargetName() {
if (tableNames_.isEmpty()) return CLUSTER_WIDE_TARGET;
@@ -959,6 +967,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
@Override
protected void process() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
// Via getAllWriteEventInfo, we can get data insertion info for
transactional tables
// even though there are no insert events generated for transactional
tables. Note
// that we cannot get DDL info from this API.
@@ -1156,6 +1165,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase
{
@Override
protected void processTableEvent() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
try {
addCommittedWriteIdsAndReload(getCatalogOpExecutor(), dbName_,
tblName_,
isPartitioned_, isMaterializedView_, writeIdsInEvent_, partitions_,
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index f3402d2d3..41599f3c0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3418,16 +3418,20 @@ public class CatalogServiceCatalog extends Catalog {
/**
* Invalidate the table if it exists by overwriting existing entry by a
Incomplete
- * Table.
- * @return null if the table does not exist else return the invalidated table
+ * Table. If skipIfInvalidated true, the invalidation is skipped if the
table is
+ * already an instance of IncompleteTable.
+ * @return null if the table does not exist or skipIfInvalidated is on and
table is
+ * invalidated else return the invalidated table
*/
- public @Nullable Table invalidateTableIfExists(String dbName, String
tblName) {
+ public @Nullable Table invalidateTableIfExists(
+ String dbName, String tblName, boolean skipIfInvalidated) {
Table incompleteTable;
try (WriteLockAndLookupDb result = new WriteLockAndLookupDb(dbName)) {
Db db = result.getDb();
if (db == null) return null;
Table existingTbl = db.getTable(tblName);
if (existingTbl == null) return null;
+ if (skipIfInvalidated && existingTbl instanceof IncompleteTable) return
null;
incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
existingTbl.getTableType(), existingTbl.getTableComment(),
existingTbl.getCreateEventId());
@@ -3438,6 +3442,15 @@ public class CatalogServiceCatalog extends Catalog {
return incompleteTable;
}
+ /**
+ * Invalidate the table if it exists by overwriting existing entry by a
Incomplete
+ * Table.
+ * @return null if the table does not exist else return the invalidated table
+ */
+ public @Nullable Table invalidateTableIfExists(String dbName, String
tblName) {
+ return invalidateTableIfExists(dbName, tblName, false /* skipIfInvalidated
*/);
+ }
+
private void scheduleTableLoading(String dbName, String tableName) {
TTableName tTblName = new TTableName(dbName.toLowerCase(),
tableName.toLowerCase());
TTableName tDbName = new TTableName(dbName.toLowerCase(), "*");
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 5b67b3274..fa93f1d54 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -1058,6 +1059,60 @@ public class MetastoreEvents {
}
return false;
}
+
+ /**
+ * Checks if the event lag exceeds the configured threshold.
+ * @return the lag value if the event is lagging and should enter catch-up
mode.
+ * @return 0 if no lagging.
+ */
+ protected long evaluateCatchUpLag() {
+ int threshold = BackendConfig.INSTANCE.getHmsEventCatchUpThreshold();
+ if (threshold <= 0) return 0;
+ long lag = (System.currentTimeMillis() / 1000) - getEventTime();
+ return lag > threshold ? lag : 0;
+ }
+
+ /**
+ * Checks whether the event is in catch-up mode by comparing the event lag
with the
+ * configured threshold. If in catch-up mode, tries to invalidate the
table and
+ * returns true to notify the caller to skip the event. Otherwise, returns
false.
+ * Note: This should be evaluated before isSelfEvent() and isOlderEvent()
to avoid
+ * hitting table locks, which can get stuck waiting for a lock
(IMPALA-12461).
+ */
+ protected boolean handleIfInCatchUpMode() {
+ long lag = evaluateCatchUpLag();
+ if (lag > 0) {
+ // It is okay if the table does not exist and can't be invalidated,
the event is
+ // skipped anyway.
+ infoLog("Catch-up Mode: Skipping event due to event lag of {}s", lag);
+ Collection<TableName> tables = getTableNames();
+ Preconditions.checkNotNull(tables, "getTableNames() must return
non-null");
+ for (TableName tableName : tables) {
+ if (catalog_.invalidateTableIfExists(
+ tableName.getDb(), tableName.getTbl(), true)
+ != null) {
+ infoLog("Catch-up Mode: Invalidated table {}.{} due to event lag
of {}s",
+ tableName.getDb(), tableName.getTbl(), lag);
+ }
+ }
+
+
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+ debugLog("Incremented skipped metric to "
+ +
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+ .getCount());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Overridden by subclasses to invalidate specific tables during catch-up
mode.
+ * Must return non-null.
+ */
+ protected Collection<TableName> getTableNames() {
+ throw new UnsupportedOperationException(
+ "getTableNames() is not supported for event type: " +
getEventType());
+ }
}
/**
@@ -1221,6 +1276,11 @@ public class MetastoreEvents {
// the partition object from the events which are batched together.
protected Partition getPartitionForBatching() { return null; }
+ @Override
+ protected Collection<TableName> getTableNames() {
+ return Collections.singletonList(new TableName(dbName_, tblName_));
+ }
+
protected MetastoreTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics
metrics,
NotificationEvent event) {
super(catalogOpExecutor, metrics, event);
@@ -1938,6 +1998,9 @@ public class MetastoreEvents {
@Override
public void processTableEvent() throws MetastoreNotificationException {
+ // Evaluate catch-up mode before isSelfEvent() and isOlderEvent() to
avoid
+ // hitting table locks, which can get stuck waiting for a lock.
+ if (handleIfInCatchUpMode()) return;
if (isSelfEvent()) {
infoLog("Not processing the insert event as it is a self-event");
return;
@@ -2129,6 +2192,8 @@ public class MetastoreEvents {
return;
}
+ if (handleIfInCatchUpMode()) return;
+
// Determine whether this is an event which we have already seen or if
it is a new
// event
if (isSelfEvent()) {
@@ -2869,6 +2934,7 @@ public class MetastoreEvents {
infoLog("Partition list is empty. Ignoring this event.");
return;
}
+ if (handleIfInCatchUpMode()) return;
try {
// Reload the whole table if it's a transactional table and incremental
// refresh is not enabled. Materialized views are treated as a special
case
@@ -3013,6 +3079,7 @@ public class MetastoreEvents {
@Override
public void processTableEvent() throws MetastoreNotificationException,
CatalogException {
+ if (handleIfInCatchUpMode()) return;
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
@@ -3099,6 +3166,7 @@ public class MetastoreEvents {
"received in the event.", getEventId());
return;
}
+ if (handleIfInCatchUpMode()) return;
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
@@ -3199,6 +3267,7 @@ public class MetastoreEvents {
@Override
protected void processTableEvent() throws MetastoreNotificationException,
CatalogException {
+ if (handleIfInCatchUpMode()) return;
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
@@ -3358,6 +3427,7 @@ public class MetastoreEvents {
infoLog("Partition list is empty. Ignoring this event.");
return;
}
+ if (handleIfInCatchUpMode()) return;
try {
// Reload the whole table if it's a transactional table or
materialized view.
// Materialized views are treated as a special case because it's
possible to
@@ -3434,6 +3504,7 @@ public class MetastoreEvents {
getFullyQualifiedTblName());
return;
}
+ if (handleIfInCatchUpMode()) return;
try {
List<Long> writeIds = txnToWriteIdList_.stream()
.map(TxnToWriteId::getWriteId)
@@ -3516,6 +3587,7 @@ public class MetastoreEvents {
@Override
public void processTableEvent() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
@@ -3692,8 +3764,19 @@ public class MetastoreEvents {
return tableWriteIds_;
}
+ @Override
+ protected Collection<TableName> getTableNames() {
+ if (tableWriteIds_ != null) {
+ return tableWriteIds_.stream()
+ .map(writeId -> new TableName(writeId.getDbName(),
writeId.getTblName()))
+ .collect(Collectors.toSet());
+ }
+ return Collections.emptyList();
+ }
+
@Override
protected void process() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
try {
infoLog("Adding {} aborted write ids for txn id {}",
tableWriteIds_.size(),
txnId_);
@@ -3774,6 +3857,7 @@ public class MetastoreEvents {
@Override
protected void processTableEvent() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
try {
catalog_.addWriteIdsToTable(getDbName(), getTableName(), getEventId(),
writeIds_,
MutableValidWriteIdList.WriteIdStatus.ABORTED);
@@ -3838,6 +3922,7 @@ public class MetastoreEvents {
@Override
protected void processTableEvent() throws MetastoreNotificationException {
+ if (handleIfInCatchUpMode()) return;
try {
if (partitionName_ == null) {
boolean notSkipped = reloadTableFromCatalog(true);
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index efd4b6383..b6934e666 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -349,6 +349,10 @@ public class BackendConfig {
return backendCfg_.hms_event_incremental_refresh_transactional_table;
}
+ public int getHmsEventCatchUpThreshold() {
+ return backendCfg_.hms_event_catchup_threshold_s;
+ }
+
public boolean isAutoCheckCompaction() {
return backendCfg_.auto_check_compaction;
}
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index 93099e934..55f7b2ea2 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -2083,3 +2083,232 @@ class
TestEventSyncWaiting(TestEventProcessingCustomConfigsBase):
"alter table {} partition(p=0) compact 'minor' and wait".format(tbl))
res = self.execute_query_expect_success(client, "show files in " + tbl)
assert len(res.data) == 1
+
+
[email protected]
[email protected]_args(
+ catalogd_args="--hms_event_polling_interval_s=1 "
+ "--hms_event_catchup_threshold_s=2 "
+ "--enable_hierarchical_event_processing=false "
+ "--debug_actions=catalogd_event_processing_delay:SLEEP@3500",
+ disable_log_buffering=True, cluster_size=1)
+class TestEventProcessingCatchupMode(TestEventProcessingCustomConfigsBase):
+ """
+ Tests for the event processor catch-up mode.
+ The cluster is configured with a 2s catch-up threshold, and a 3.5s delay is
+ injected in event processing to trigger catch-up mode for the tests.
+ """
+
+ @pytest.mark.execute_serially
+ def test_catchup_mode_alter_table(self, unique_database):
+ """
+ Tests that the event processor switches to catch-up mode to invalidate the
table
+ and skip the alter table events when the event lag exceeds the configured
threshold.
+ """
+ tbl = unique_database + ".catchup_test_tbl"
+ self.client.execute("create table {} (i int) partitioned by (p
int)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("alter table {} add partition (p=1)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("alter table {} add partition (p=2)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("alter table {} add partition (p=3)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ # Verify the logs.
+ log_regex = r"Invalidated table {} due to event lag of .*s".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=3,
timeout_s=20)
+ log_regex =\
+ r"{}.* Catch-up Mode: Skipping event .*s".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=3,
timeout_s=20)
+
+ # Verify consistency.
+ res = self.client.execute("show partitions {}".format(tbl))
+ assert len(res.data) == 4
+ assert 'p=1' in res.get_data()
+ assert 'p=2' in res.get_data()
+ assert 'p=3' in res.get_data()
+
+ @pytest.mark.execute_serially
+ def test_catchup_mode_partition_events(self, unique_database):
+ """
+ Tests that various event types (INSERT, ADD_PARTITION, DROP_PARTITION)
+ trigger catch-up mode when the event processor is lagging.
+ """
+ tbl = unique_database + ".catchup_partition_events_tbl"
+ self.client.execute("create table {} (i int) partitioned by (p
int)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("alter table {} add partition (p=1)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ # Verify ADD_PARTITION logs.
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: ADD_PARTITION .* {}.* Catch-up Mode:
Skipping".format(tbl),
+ expected_count=1, timeout_s=20)
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: ADD_PARTITION .* Invalidated table {}".format(tbl),
+ expected_count=1, timeout_s=20)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("insert into {} partition (p=1) values
(100)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ # Verify INSERT logs.
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: INSERT .* {}.* Catch-up Mode: Skipping".format(tbl),
+ expected_count=1, timeout_s=20)
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: INSERT .* Invalidated table {}".format(tbl),
+ expected_count=1, timeout_s=20)
+ self.client.execute("show partitions {}".format(tbl))
+ self.run_stmt_in_hive("alter table {} drop partition (p=1)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ # Verify DROP_PARTITION logs.
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: DROP_PARTITION .* {}.* Catch-up Mode:
Skipping".format(tbl),
+ expected_count=1, timeout_s=20)
+ self.assert_catalogd_log_contains("INFO",
+ r"EventType: DROP_PARTITION .* Invalidated table {}".format(tbl),
+ expected_count=1, timeout_s=20)
+ EventProcessorUtils.wait_for_event_processing(self)
+ res = self.client.execute("show partitions {}".format(tbl))
+ assert 'p=1' not in res.get_data()
+
+ def _run_catchup_mode_txn_test(self, unique_database, is_commit, ev_type):
+ tbl1_name = "catchup_txn_tbl1_" + ev_type.lower()
+ tbl2_name = "catchup_txn_tbl2_" + ev_type.lower()
+ tbl1 = unique_database + "." + tbl1_name
+ tbl2 = unique_database + "." + tbl2_name
+
+ # Create multiple transactional tables for testing.
+ self.client.execute(
+ "create table {} (i int) "
+ "tblproperties ('transactional'='true', "
+ "'transactional_properties'='insert_only')".format(tbl1))
+ self.client.execute(
+ "create table {} (i int) "
+ "tblproperties ('transactional'='true', "
+ "'transactional_properties'='insert_only')".format(tbl2))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ # Use the AcidTxn to manually control the transaction to allow us load the
tables
+ # before the commit or abort txn event for invalidating the tables by this
event.
+ acid = AcidTxn(self.hive_client)
+ txn_id = acid.open_txns()
+
+ # Triggers ALLOC_WRITE_ID events for later commit or abort txn.
+ acid.allocate_table_write_ids(txn_id, unique_database, tbl1_name)
+ acid.allocate_table_write_ids(txn_id, unique_database, tbl2_name)
+ EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+
+ # Load the tables for commit or abort txn event having something to
invalidate.
+ self.client.execute("describe {}".format(tbl1))
+ self.client.execute("describe {}".format(tbl2))
+
+ # Triggers commit or abort txn to invalidate tables in catch-up mode.
+ events_skipped_before =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ if is_commit:
+ acid.commit_txn(txn_id)
+ else:
+ acid.abort_txn(txn_id)
+ EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+ events_skipped_after =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ assert events_skipped_after == events_skipped_before + 1
+
+ # Verify the logs.
+ log_regex1 =\
+ r"{} .* Invalidated table {} due to event lag of .*s".format(ev_type,
tbl1)
+ self.assert_catalogd_log_contains("INFO", log_regex1, expected_count=1,
timeout_s=20)
+ log_regex2 =\
+ r"{} .* Invalidated table {} due to event lag of .*s".format(ev_type,
tbl2)
+ self.assert_catalogd_log_contains("INFO", log_regex2, expected_count=1,
timeout_s=20)
+
+ @pytest.mark.execute_serially
+ def test_catchup_mode_commit_txn(self, unique_database):
+ """
+ Tests that CommitTxn events cause the event processor to switch to
catch-up mode
+ to invalidate multiple involved tables when the processing delay exceeds
the
+ configured threshold.
+ """
+ self._run_catchup_mode_txn_test(
+ unique_database, is_commit=True, ev_type='COMMIT_TXN')
+
+ @pytest.mark.execute_serially
+ def test_catchup_mode_abort_txn(self, unique_database):
+ """
+ Tests that AbortTxn events cause the event processor to switch to catch-up
mode
+ to invalidate multiple involved tables when the processing delay exceeds
the
+ configured threshold.
+ """
+ self._run_catchup_mode_txn_test(
+ unique_database, is_commit=False, ev_type='ABORT_TXN')
+
+ @pytest.mark.execute_serially
+ def test_catchup_mode_iceberg(self, unique_database):
+ """
+ Tests that iceberg ingestion generates ALTER_TABLE events, when triggering
+ rapid insertions from Hive, it can trigger catch-up mode.
+ """
+ tbl_name = "catchup_mode_iceberg_tbl"
+ tbl = unique_database + "." + tbl_name
+
+ # Create Iceberg table.
+ self.client.execute(
+ "create table {} (i int, s string, p int) "
+ "partitioned by spec(p) stored as iceberg".format(tbl))
+ self.client.execute("insert into {} values (0, 'aaa', 0)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("describe {}".format(tbl))
+
+ # Trigger hive insertions to have external ALTER_TABLE events rapidly.
+ self.run_stmt_in_hive(
+ "insert into {0} values (1, 'bbb', 1); "
+ "insert into {0} values (2, 'ccc', 2); "
+ "insert into {0} values (3, 'ddd', 3); "
+ "insert into {0} values (4, 'eee', 4); "
+ "insert into {0} values (5, 'fff', 5);".format(tbl)
+ )
+ EventProcessorUtils.wait_for_event_processing(self, timeout=20)
+
+ # Verify logs.
+ log_regex = r"ALTER_TABLE .*{}.* Catch-up Mode: Skipping".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=6,
timeout_s=20)
+ log_regex =\
+ r"ALTER_TABLE .* Invalidated table {} due to event lag of
.*s".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=2,
timeout_s=20)
+
+ # Verify consistency.
+ res = self.client.execute("select count(*) from {}".format(tbl))
+ assert '6' in res.get_data()
+
+
[email protected]
+class
TestEventProcessingCatchupModeCustom(TestEventProcessingCustomConfigsBase):
+
+ @CustomClusterTestSuite.with_args(
+ catalogd_args="--hms_event_polling_interval_s=1 "
+ "--hms_event_catchup_threshold_s=2 "
+ "--enable_hierarchical_event_processing=false",
+ disable_log_buffering=True, cluster_size=1)
+ def test_catchup_mode_compute_stats(self, unique_database):
+ """
+ Tests ALTER_TABLE events lagging generated by a slow COMPUTE STATS.
+ """
+ tbl = unique_database + ".catchup_stats_tbl"
+ self.client.execute("create table {} (i int) partitioned by (p
int)".format(tbl))
+ self.client.execute("insert into {} partition (p=1) values
(1)".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.client.execute("describe {}".format(tbl))
+
+ # Trigger COMPUTE STATS to generate ALTER_TABLE events.
+ # The catalogd_update_stats_delay causes at least 3.5s event lag.
+ self.client.execute("set
debug_action='catalogd_update_stats_delay:SLEEP@3500'")
+ self.client.execute("compute stats {}".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ # Verify the logs.
+ log_regex = r"ALTER_TABLE.*Invalidated table {} due to event
lag".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=1,
timeout_s=20)
+ log_regex = r"ALTER_TABLE.*{}.* Catch-up Mode: Skipping".format(tbl)
+ self.assert_catalogd_log_contains("INFO", log_regex, expected_count=1,
timeout_s=20)