This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 46525bcd7c76eb1145a855f3706ece6fff380b8f Author: Sai Hemanth Gantasala <saihema...@cloudera.com> AuthorDate: Tue Jul 8 10:30:24 2025 -0700 IMPALA-14082: Support batch processing of RELOAD events on same table Currently, RELOAD events of partitioned table are processed one after the other. Processing them one by one acquires the table lock multiple times to load individual partitions in sequence. This also keeps the table version changing which impacts performance of coordinators in local-catalog mode - query planning needs retry to handle InconsistentMetadataFetchException due to table version changes. This patch handles the batch processing logic RELOAD events on same table by reusing the exisiting logic of BatchPartitionEvent. This implementation adds four new methods canBeBatched(),addToBatchEvents(), getPartitionForBatching(), getBatchEventType()(pre-requisites to reuse batching logic) to the RELOAD event class. Testing: - Added an end-to-end to verify the batching. Change-Id: Ie3e9a99b666a1c928ac2a136bded1e5420f77dab Reviewed-on: http://gerrit.cloudera.org:8080/23159 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../impala/catalog/events/MetastoreEvents.java | 41 +++++++++++- .../apache/impala/service/CatalogOpExecutor.java | 3 +- tests/custom_cluster/test_events_custom_configs.py | 74 ++++++++++++++++++++-- 3 files changed, 108 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 834adcee6..6bd4aeebc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -136,6 +136,7 @@ public class MetastoreEvents { INSERT("INSERT"), INSERT_PARTITIONS("INSERT_PARTITIONS"), RELOAD("RELOAD"), + RELOAD_PARTITIONS("RELOAD_PARTITIONS"), ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"), COMMIT_TXN("COMMIT_TXN"), ABORT_TXN("ABORT_TXN"), @@ -2991,8 +2992,9 @@ public class MetastoreEvents { partitions.add(event.getPartitionForBatching()); } try { - if (baseEvent_ instanceof InsertEvent) { - // for insert event, always reload file metadata so that new files + if ((baseEvent_ instanceof InsertEvent) || + (baseEvent_ instanceof ReloadEvent)) { + // for insert & reload events, always reload file metadata so that new files // are reflected in HdfsPartition reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, getEventDesc(), true); @@ -3332,6 +3334,41 @@ public class MetastoreEvents { } } + @Override + public boolean canBeBatched(MetastoreEvent event) { + if (!(event instanceof ReloadEvent)) return false; + if (isOlderThanLastSyncEventId(event)) return false; + ReloadEvent reloadEvent = (ReloadEvent) event; + // make sure that the event is on the same table + if (!getFullyQualifiedTblName().equalsIgnoreCase( + reloadEvent.getFullyQualifiedTblName())) { + return false; + } + // we currently only batch partition level reload events + if (this.reloadPartition_ == null || reloadEvent.reloadPartition_ == null) { + return false; + } + return true; + } + + @Override + public MetastoreEvent addToBatchEvents(MetastoreEvent event) { + if (!(event instanceof ReloadEvent)) return null; + BatchPartitionEvent<ReloadEvent> batchEvent = new BatchPartitionEvent<>( + this); + Preconditions.checkState(batchEvent.canBeBatched(event)); + batchEvent.addToBatchEvents(event); + return batchEvent; + } + + @Override + protected Partition getPartitionForBatching() { return reloadPartition_; } + + @Override + protected MetastoreEventType getBatchEventType() { + return MetastoreEventType.RELOAD_PARTITIONS; + } + private void processTableInvalidate() throws MetastoreNotificationException { Reference<Boolean> tblWasRemoved = new Reference<>(); Reference<Boolean> dbWasAdded = new Reference<>(); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 1ea95efc1..fb5640359 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -7532,7 +7532,8 @@ public class CatalogOpExecutor { .getPartitionFromThriftPartitionSpec(partSpecList.get(i)); if (partition != null) { HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); - partBuilder.setLastRefreshEventId(eventIds.get(0)); + // use last event id, so that batch partition events will not reloaded again + partBuilder.setLastRefreshEventId(eventIds.get(eventIds.size() - 1)); partitionChanged |= hdfsTbl.updatePartition(partBuilder); } else { LOG.warn("Partition {} no longer exists in table {}. It might be " + diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 7a0f7031d..1bbb0047f 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -17,6 +17,7 @@ from __future__ import absolute_import, division, print_function from builtins import range import logging +import os import pytest import re from os import getenv @@ -570,19 +571,24 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): @CustomClusterTestSuite.with_args( catalogd_args="--hms_event_polling_interval_s=5" - " --enable_reload_events=true") + " --enable_reload_events=true" + " --enable_skipping_older_events=true") def test_refresh_invalidate_events(self, unique_database): + """The reload events generated by this test will be batched by + MetaStoreEvents#BatchPartitionEvent and BatchPartitionEvent#isOlderEvent() requires + 'enable_skipping_older_events=true' to skip older reload events.""" self.run_test_refresh_invalidate_events(unique_database, "reload_table") @CustomClusterTestSuite.with_args( catalogd_args="--hms_event_polling_interval_s=5" " --enable_reload_events=true" - " --enable_sync_to_latest_event_on_ddls=true") + " --enable_sync_to_latest_event_on_ddls=true" + " --enable_skipping_older_events=true") def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database): self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True) def run_test_refresh_invalidate_events(self, unique_database, test_reload_table, - enable_sync_to_latest_event_on_ddls=False): + fire_reload_events_from_hive=False): """Test is to verify Impala-11808, refresh/invalidate commands should generate a Reload event in HMS and CatalogD's event processor should process this event. """ @@ -624,8 +630,10 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): .format(unique_database, test_reload_table), 2) EventProcessorUtils.wait_for_event_processing(self) - if enable_sync_to_latest_event_on_ddls: - # Test to verify if older events are being skipped in event processor + if fire_reload_events_from_hive: + # Test to verify if older events from hive are being skipped in event processor. + # Firing 10 consecutive RELOAD events, the first one processes and updates the + # lastRefreshEventId, causing the remaining 9 to be ignored. data = FireEventRequestData() data.refreshEvent = True req = FireEventRequest(True, data) @@ -633,7 +641,7 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): req.tableName = test_reload_table # table level reload events tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events() - for i in range(10): + for _ in range(10): self.hive_client.fire_listener_event(req) EventProcessorUtils.wait_for_event_processing(self) tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events() @@ -641,9 +649,13 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): # partition level reload events EventProcessorUtils.wait_for_event_processing(self) part_events_skipped_before = EventProcessorUtils.get_num_skipped_events() + self.client.execute(":event_processor('pause')") req.partitionVals = ["2022"] - for i in range(10): + for _ in range(10): self.hive_client.fire_listener_event(req) + self.client.execute("refresh {}.{} partition(year=2022)" + .format(unique_database, test_reload_table)) + self.client.execute(":event_processor('start')") EventProcessorUtils.wait_for_event_processing(self) part_events_skipped_after = EventProcessorUtils.get_num_skipped_events() assert part_events_skipped_after > part_events_skipped_before @@ -796,6 +808,54 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): assert batch_events_2 == batch_events_1 assert current_skipped_events - prev_skipped_events >= 24 + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") + def test_batch_reload_events(self, unique_database): + """Test to verify IMPALA-14082, adding batching logic for partitioned refresh events. + Before batching the events, each event is checked if the event id is greater than + table's lastSyncEventId then the event can be batched else it can be skipped.""" + tbl = unique_database + ".batch_refresh_tbl" + self.client.execute( + "create table {} (i int) partitioned by(p int) stored as textfile".format(tbl)) + self.client.execute("insert into {} partition(p) values (0,0),(1,1),(2,2)" + .format(tbl)) + EventProcessorUtils.wait_for_event_processing(self) + + def __get_fs_location(table_name): + return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, table_name) + + batch_events_1 = EventProcessorUtils.get_int_metric("batch-events-created") + tbl_path = __get_fs_location("batch_refresh_tbl") + # new data for p=0, overwrite p=2 with p=1, delete data under p=1 + self.filesystem_client.create_file(os.path.join(tbl_path + "p=0/", + "new_file.txt"), "4") + self.filesystem_client.delete_file_dir(tbl_path + "p=2/", recursive=True) + self.filesystem_client.make_dir(tbl_path + "p=2/") + self.filesystem_client.copy(tbl_path + "p=1/", tbl_path + "p=2/") + self.filesystem_client.delete_file_dir(tbl_path + "p=1/", recursive=True) + data = FireEventRequestData() + data.refreshEvent = True + req = FireEventRequest(True, data) + req.dbName = unique_database + req.tableName = "batch_refresh_tbl" + req.partitionVals = ["0"] + self.hive_client.fire_listener_event(req) + req.partitionVals = ["1"] + self.hive_client.fire_listener_event(req) + req.partitionVals = ["2"] + self.hive_client.fire_listener_event(req) + EventProcessorUtils.wait_for_event_processing(self) + batch_events_2 = EventProcessorUtils.get_int_metric("batch-events-created") + assert batch_events_2 == batch_events_1 + 1 + # p=0 has two values 0, 4 and p=2 has value 1 + result = self.execute_query( + "select * from {} order by i".format(tbl)) + parsed_data = [] + for line in result.get_data().strip().split('\n'): + row = [int(val) for val in line.strip().split('\t')] + parsed_data.append(row) + expected_data = [[0, 0], [1, 2], [4, 0]] + assert expected_data == parsed_data + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") def test_commit_compaction_events(self, unique_database): """Test is to verify Impala-11626, commit compaction events triggered in HMS would