This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 46525bcd7c76eb1145a855f3706ece6fff380b8f
Author: Sai Hemanth Gantasala <saihema...@cloudera.com>
AuthorDate: Tue Jul 8 10:30:24 2025 -0700

    IMPALA-14082: Support batch processing of RELOAD events on same table
    
    Currently, RELOAD events of partitioned table are processed one after
    the other. Processing them one by one acquires the table lock multiple
    times to load individual partitions in sequence. This also keeps the
    table version changing which impacts performance of coordinators in
    local-catalog mode - query planning needs retry to handle
    InconsistentMetadataFetchException due to table version changes.
    
    This patch handles the batch processing logic RELOAD events on same
    table by reusing the exisiting logic of BatchPartitionEvent. This
    implementation adds four new methods canBeBatched(),addToBatchEvents(),
    getPartitionForBatching(), getBatchEventType()(pre-requisites to reuse
    batching logic) to the RELOAD event class.
    
    Testing:
    - Added an end-to-end to verify the batching.
    
    Change-Id: Ie3e9a99b666a1c928ac2a136bded1e5420f77dab
    Reviewed-on: http://gerrit.cloudera.org:8080/23159
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 41 +++++++++++-
 .../apache/impala/service/CatalogOpExecutor.java   |  3 +-
 tests/custom_cluster/test_events_custom_configs.py | 74 ++++++++++++++++++++--
 3 files changed, 108 insertions(+), 10 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 834adcee6..6bd4aeebc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -136,6 +136,7 @@ public class MetastoreEvents {
     INSERT("INSERT"),
     INSERT_PARTITIONS("INSERT_PARTITIONS"),
     RELOAD("RELOAD"),
+    RELOAD_PARTITIONS("RELOAD_PARTITIONS"),
     ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
     COMMIT_TXN("COMMIT_TXN"),
     ABORT_TXN("ABORT_TXN"),
@@ -2991,8 +2992,9 @@ public class MetastoreEvents {
           partitions.add(event.getPartitionForBatching());
         }
         try {
-          if (baseEvent_ instanceof InsertEvent) {
-            // for insert event, always reload file metadata so that new files
+          if ((baseEvent_ instanceof InsertEvent) ||
+              (baseEvent_ instanceof ReloadEvent)) {
+            // for insert & reload events, always reload file metadata so that 
new files
             // are reflected in HdfsPartition
             reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, 
getEventDesc(),
                 true);
@@ -3332,6 +3334,41 @@ public class MetastoreEvents {
       }
     }
 
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      if (!(event instanceof ReloadEvent)) return false;
+      if (isOlderThanLastSyncEventId(event)) return false;
+      ReloadEvent reloadEvent = (ReloadEvent) event;
+      // make sure that the event is on the same table
+      if (!getFullyQualifiedTblName().equalsIgnoreCase(
+          reloadEvent.getFullyQualifiedTblName())) {
+        return false;
+      }
+      // we currently only batch partition level reload events
+      if (this.reloadPartition_ == null || reloadEvent.reloadPartition_ == 
null) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      if (!(event instanceof ReloadEvent)) return null;
+      BatchPartitionEvent<ReloadEvent> batchEvent = new BatchPartitionEvent<>(
+          this);
+      Preconditions.checkState(batchEvent.canBeBatched(event));
+      batchEvent.addToBatchEvents(event);
+      return batchEvent;
+    }
+
+    @Override
+    protected Partition getPartitionForBatching() { return reloadPartition_; }
+
+    @Override
+    protected MetastoreEventType getBatchEventType() {
+      return MetastoreEventType.RELOAD_PARTITIONS;
+    }
+
     private void processTableInvalidate() throws 
MetastoreNotificationException {
       Reference<Boolean> tblWasRemoved = new Reference<>();
       Reference<Boolean> dbWasAdded = new Reference<>();
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1ea95efc1..fb5640359 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7532,7 +7532,8 @@ public class CatalogOpExecutor {
               .getPartitionFromThriftPartitionSpec(partSpecList.get(i));
           if (partition != null) {
             HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
-            partBuilder.setLastRefreshEventId(eventIds.get(0));
+            // use last event id, so that batch partition events will not 
reloaded again
+            partBuilder.setLastRefreshEventId(eventIds.get(eventIds.size() - 
1));
             partitionChanged |= hdfsTbl.updatePartition(partBuilder);
           } else {
             LOG.warn("Partition {} no longer exists in table {}. It might be " 
+
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 7a0f7031d..1bbb0047f 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -17,6 +17,7 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import logging
+import os
 import pytest
 import re
 from os import getenv
@@ -570,19 +571,24 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
 
   @CustomClusterTestSuite.with_args(
       catalogd_args="--hms_event_polling_interval_s=5"
-                    " --enable_reload_events=true")
+                    " --enable_reload_events=true"
+                    " --enable_skipping_older_events=true")
   def test_refresh_invalidate_events(self, unique_database):
+    """The reload events generated by this test will be batched by
+    MetaStoreEvents#BatchPartitionEvent and BatchPartitionEvent#isOlderEvent() 
requires
+    'enable_skipping_older_events=true' to skip older reload events."""
     self.run_test_refresh_invalidate_events(unique_database, "reload_table")
 
   @CustomClusterTestSuite.with_args(
       catalogd_args="--hms_event_polling_interval_s=5"
                     " --enable_reload_events=true"
-                    " --enable_sync_to_latest_event_on_ddls=true")
+                    " --enable_sync_to_latest_event_on_ddls=true"
+                    " --enable_skipping_older_events=true")
   def test_refresh_invalidate_events_enable_sync_to_latest_events(self, 
unique_database):
     self.run_test_refresh_invalidate_events(unique_database, 
"reload_table_sync", True)
 
   def run_test_refresh_invalidate_events(self, unique_database, 
test_reload_table,
-    enable_sync_to_latest_event_on_ddls=False):
+    fire_reload_events_from_hive=False):
     """Test is to verify Impala-11808, refresh/invalidate commands should 
generate a
     Reload event in HMS and CatalogD's event processor should process this 
event.
     """
@@ -624,8 +630,10 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
         .format(unique_database, test_reload_table), 2)
     EventProcessorUtils.wait_for_event_processing(self)
 
-    if enable_sync_to_latest_event_on_ddls:
-      # Test to verify if older events are being skipped in event processor
+    if fire_reload_events_from_hive:
+      # Test to verify if older events from hive are being skipped in event 
processor.
+      # Firing 10 consecutive RELOAD events, the first one processes and 
updates the
+      # lastRefreshEventId, causing the remaining 9 to be ignored.
       data = FireEventRequestData()
       data.refreshEvent = True
       req = FireEventRequest(True, data)
@@ -633,7 +641,7 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       req.tableName = test_reload_table
       # table level reload events
       tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
-      for i in range(10):
+      for _ in range(10):
         self.hive_client.fire_listener_event(req)
       EventProcessorUtils.wait_for_event_processing(self)
       tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
@@ -641,9 +649,13 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       # partition level reload events
       EventProcessorUtils.wait_for_event_processing(self)
       part_events_skipped_before = EventProcessorUtils.get_num_skipped_events()
+      self.client.execute(":event_processor('pause')")
       req.partitionVals = ["2022"]
-      for i in range(10):
+      for _ in range(10):
         self.hive_client.fire_listener_event(req)
+      self.client.execute("refresh {}.{} partition(year=2022)"
+        .format(unique_database, test_reload_table))
+      self.client.execute(":event_processor('start')")
       EventProcessorUtils.wait_for_event_processing(self)
       part_events_skipped_after = EventProcessorUtils.get_num_skipped_events()
       assert part_events_skipped_after > part_events_skipped_before
@@ -796,6 +808,54 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     assert batch_events_2 == batch_events_1
     assert current_skipped_events - prev_skipped_events >= 24
 
+  
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_batch_reload_events(self, unique_database):
+    """Test to verify IMPALA-14082, adding batching logic for partitioned 
refresh events.
+    Before batching the events, each event is checked if the event id is 
greater than
+    table's lastSyncEventId then the event can be batched else it can be 
skipped."""
+    tbl = unique_database + ".batch_refresh_tbl"
+    self.client.execute(
+      "create table {} (i int) partitioned by(p int) stored as 
textfile".format(tbl))
+    self.client.execute("insert into {} partition(p) values (0,0),(1,1),(2,2)"
+        .format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+
+    def __get_fs_location(table_name):
+      return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, table_name)
+
+    batch_events_1 = EventProcessorUtils.get_int_metric("batch-events-created")
+    tbl_path = __get_fs_location("batch_refresh_tbl")
+    # new data for p=0, overwrite p=2 with p=1, delete data under p=1
+    self.filesystem_client.create_file(os.path.join(tbl_path + "p=0/",
+        "new_file.txt"), "4")
+    self.filesystem_client.delete_file_dir(tbl_path + "p=2/", recursive=True)
+    self.filesystem_client.make_dir(tbl_path + "p=2/")
+    self.filesystem_client.copy(tbl_path + "p=1/", tbl_path + "p=2/")
+    self.filesystem_client.delete_file_dir(tbl_path + "p=1/", recursive=True)
+    data = FireEventRequestData()
+    data.refreshEvent = True
+    req = FireEventRequest(True, data)
+    req.dbName = unique_database
+    req.tableName = "batch_refresh_tbl"
+    req.partitionVals = ["0"]
+    self.hive_client.fire_listener_event(req)
+    req.partitionVals = ["1"]
+    self.hive_client.fire_listener_event(req)
+    req.partitionVals = ["2"]
+    self.hive_client.fire_listener_event(req)
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_2 = EventProcessorUtils.get_int_metric("batch-events-created")
+    assert batch_events_2 == batch_events_1 + 1
+    # p=0 has two values 0, 4 and p=2 has value 1
+    result = self.execute_query(
+      "select * from {} order by i".format(tbl))
+    parsed_data = []
+    for line in result.get_data().strip().split('\n'):
+      row = [int(val) for val in line.strip().split('\t')]
+      parsed_data.append(row)
+    expected_data = [[0, 0], [1, 2], [4, 0]]
+    assert expected_data == parsed_data
+
   
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_commit_compaction_events(self, unique_database):
     """Test is to verify Impala-11626, commit compaction events triggered in 
HMS would

Reply via email to