This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 384a664f54f6fa32dc39d3368ad19fda225bff01
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Mon Jun 9 11:47:54 2025 -0700

    IMPALA-14107: Increment table version in fireReloadEvent
    
    test_reload_events_with_transient_partitions can get stuck when running
    in local catalog mode. From the catalod.INFO, it looks like catalog keep
    looking for non-existent partition id.
    
    In local catalog mode, CatalogLookupStatus.PARTITION_NOT_FOUND response
    from CatalogD will be followed by Coordinator invalidating table
    metadata cache and table list of database. However, it does not
    invalidate the partition list of requested table. This cause Coordinator
    to re-request the same partition id that is not exist anymore in
    CatalogD. This patch attempt to fix the issue from CatalogD side by
    incrementing table version in fireReloadEventAndUpdateRefreshEventId if
    HdfsTable.updatePartition() do detect any partition change. CatalogD
    will send new topic update that reset Coordinator cache, and Coordinator
    will retry the same partition request with an updated partition ID.
    
    Reworded the InconsistentMetadataFetchException message to clarify the
    lookup_status failure vs the req.object_desc.
    
    This patch also tweak HdfsTable.getPartialInfo() to directly gather all
    partitions when Coordinator does not request for specific partitions.
    
    Testing:
    Split test_reload_events_with_transient_partitions into two setup:
    legacy catalog mode and local catalog mode. Confirm that both tests
    pass.
    
    Change-Id: I1bc91023d46a0b57916202d2b64e2cf07b687b74
    Reviewed-on: http://gerrit.cloudera.org:8080/22956
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 49 ++++++++++++++--------
 .../impala/catalog/local/CatalogdMetaProvider.java |  4 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 15 ++++++-
 tests/custom_cluster/test_events_custom_configs.py | 23 +++++++---
 4 files changed, 65 insertions(+), 26 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 06bbcb3ca..7f6cf7f85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1026,12 +1026,15 @@ public class HdfsTable extends Table implements 
FeFsTable {
     }
   }
 
-  public void updatePartitions(List<HdfsPartition.Builder> partBuilders)
+  public boolean updatePartitions(List<HdfsPartition.Builder> partBuilders)
       throws CatalogException {
-    for (HdfsPartition.Builder p : partBuilders) updatePartition(p);
+    boolean partitionChanged = false;
+    for (HdfsPartition.Builder p : partBuilders) partitionChanged |= 
updatePartition(p);
+    return partitionChanged;
   }
 
-  public void updatePartition(HdfsPartition.Builder partBuilder) throws 
CatalogException {
+  public boolean updatePartition(HdfsPartition.Builder partBuilder)
+      throws CatalogException {
     HdfsPartition oldPartition = partBuilder.getOldInstance();
     Preconditions.checkNotNull(oldPartition,
         "Old partition instance should exist for updates");
@@ -1042,11 +1045,12 @@ public class HdfsTable extends Table implements 
FeFsTable {
     boolean partitionNotChanged = partBuilder.equalsToOriginal(oldPartition);
     LOG.trace("Partition {} {}", oldPartition.getName(),
         partitionNotChanged ? "changed" : "unchanged");
-    if (partitionNotChanged) return;
+    if (partitionNotChanged) return false;
     HdfsPartition newPartition = partBuilder.build();
     // Partition is reloaded and hence cache directives are not dropped.
     dropPartition(oldPartition, false);
     addPartition(newPartition);
+    return true;
   }
 
   /**
@@ -2290,10 +2294,25 @@ public class HdfsTable extends Table implements 
FeFsTable {
     }
 
     Collection<Long> partIds = req.table_info_selector.partition_ids;
-    if (partIds == null && wantPartitionInfo) {
-      // Caller specified at least one piece of partition info but didn't 
specify
-      // any partition IDs. That means they want the info for all partitions.
-      partIds = partitionMap_.keySet();
+    Collection<HdfsPartition> requestedPartitions = null;
+    if (wantPartitionInfo) {
+      if (partIds == null) {
+        // Caller specified at least one piece of partition info but didn't 
specify
+        // any partition IDs. That means they want the info for all partitions.
+        requestedPartitions = partitionMap_.values();
+      } else {
+        requestedPartitions = Lists.newArrayListWithCapacity(partIds.size());
+        for (long partId : partIds) {
+          HdfsPartition part = partitionMap_.get(partId);
+          if (part == null) {
+            LOG.warn(String.format(
+                "Missing partition ID: %s, Table: %s", partId, getFullName()));
+            return new TGetPartialCatalogObjectResponse().setLookup_status(
+                CatalogLookupStatus.PARTITION_NOT_FOUND);
+          }
+          requestedPartitions.add(part);
+        }
+      }
     }
 
     ValidWriteIdList reqWriteIdList = req.table_info_selector.valid_write_ids 
== null ?
@@ -2303,16 +2322,10 @@ public class HdfsTable extends Table implements 
FeFsTable {
     Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC);
     int numFilesFiltered = 0;
     int numFilesCollected = 0;
-    if (partIds != null) {
-      resp.table_info.partitions = 
Lists.newArrayListWithCapacity(partIds.size());
-      for (long partId : partIds) {
-        HdfsPartition part = partitionMap_.get(partId);
-        if (part == null) {
-          LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId,
-              getFullName()));
-          return new TGetPartialCatalogObjectResponse().setLookup_status(
-              CatalogLookupStatus.PARTITION_NOT_FOUND);
-        }
+    if (requestedPartitions != null) {
+      resp.table_info.partitions =
+          Lists.newArrayListWithCapacity(requestedPartitions.size());
+      for (HdfsPartition part : requestedPartitions) {
         Pair<TPartialPartitionInfo, Integer> partInfoStatus =
             part.getPartialPartitionInfo(req, reqWriteIdList);
         if (partInfoStatus.second != null) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index fa40fdddc..89779d4b0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -485,8 +485,8 @@ public class CatalogdMetaProvider implements MetaProvider {
       case DATA_SOURCE_NOT_FOUND:
         invalidateCacheForObject(req.object_desc);
         throw new InconsistentMetadataFetchException(resp.lookup_status,
-            String.format("Fetching %s failed: %s. Could not find %s",
-                req.object_desc.type, resp.lookup_status, req.object_desc));
+            String.format("Fetching %s failed: %s for %s", 
req.object_desc.type,
+                resp.lookup_status, req.object_desc));
       default: break;
     }
     Preconditions.checkState(resp.lookup_status == CatalogLookupStatus.OK);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1534f271a..d83070c35 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7243,6 +7243,12 @@ public class CatalogOpExecutor {
         }
         addDbToCatalogUpdate(addedDb, req.header.want_minimal_response, 
resp.getResult());
       }
+      // Here, we are sending an old snapshot if the table is modified again
+      // in fireReloadEventAndUpdateRefreshEventId(). This is OK since the old 
snapshot
+      // works as the new one in coordinator side. To be specific:
+      // - in local catalog mode, only the table name is used to evict stale 
cache items.
+      // - in the legacy catalog mode, the old snapshot has the same partition 
metadata
+      //   and file metadata.
       resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
     } else if (req.isAuthorization()) {
       AuthorizationDelta authzDelta = catalog_.refreshAuthorization(false);
@@ -7305,7 +7311,10 @@ public class CatalogOpExecutor {
             tbl.getFullName());
         return;
       }
+
+      // tbl lock is held at this point.
       if (partSpecList != null) {
+        boolean partitionChanged = false;
         for (int i = 0; i < partSpecList.size(); ++i) {
           HdfsTable hdfsTbl = (HdfsTable) tbl;
           HdfsPartition partition = hdfsTbl
@@ -7313,7 +7322,7 @@ public class CatalogOpExecutor {
           if (partition != null) {
             HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
             partBuilder.setLastRefreshEventId(eventIds.get(0));
-            hdfsTbl.updatePartition(partBuilder);
+            partitionChanged |= hdfsTbl.updatePartition(partBuilder);
           } else {
             LOG.warn("Partition {} no longer exists in table {}. It might be " 
+
                     "dropped by a concurrent operation.",
@@ -7321,6 +7330,10 @@ public class CatalogOpExecutor {
                 hdfsTbl.getFullName());
           }
         }
+        if (partitionChanged) {
+          // Set catalog version of the table to a new one.
+          tbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
+        }
       } else {
         tbl.setLastRefreshEventId(eventIds.get(0));
       }
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 6897424d1..bcf86c9fc 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -658,8 +658,18 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
   @CustomClusterTestSuite.with_args(
-    catalogd_args="--enable_reload_events=true")
-  def test_reload_events_with_transient_partitions(self, unique_database):
+      impalad_args="--use_local_catalog=false",
+      catalogd_args="--catalog_topic_mode=full --enable_reload_events=true")
+  def test_reload_events_with_transient_partitions_legacy_catalog(self, 
unique_database):
+    self.run_test_reload_events_with_transient_partitions(unique_database)
+
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal --enable_reload_events=true")
+  def test_reload_events_with_transient_partitions_local_catalog(self, 
unique_database):
+    self.run_test_reload_events_with_transient_partitions(unique_database)
+
+  def run_test_reload_events_with_transient_partitions(self, unique_database):
     tbl = unique_database + ".tbl"
     create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl)
     add_part_stmt = "alter table {} add if not exists 
partition(p=0)".format(tbl)
@@ -676,9 +686,10 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       self.execute_query(drop_part_stmt)
       refresh_state = self.client.wait_for_any_impala_state(
         refresh_handle, end_states, 10)
-      assert refresh_state == FINISHED, \
-          "REFRESH state: {}. Error log: {}".format(
-            refresh_state, self.client.get_log(refresh_handle))
+      is_finished = (refresh_state == FINISHED)
+      error_log = None if is_finished else self.client.get_log(refresh_handle)
+      self.client.close_query(refresh_handle)
+      assert is_finished, "REFRESH is error. Error log: {}".format(error_log)
       self.execute_query(add_part_stmt)
       refresh_handle = self.client.execute_async(refresh_stmt)
 
@@ -1630,6 +1641,7 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
       assert curr_drop_table_metric == prev_drop_table_metric + 1
       assert curr_create_table_metric == prev_create_table_metric + 1
 
+
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
   """This class contains tests that exercise the event processing mechanism in 
the
@@ -1663,6 +1675,7 @@ class 
TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
     self._run_self_events_test(unique_database, 
vector.get_value('exec_option'),
                                use_impala=True)
 
+
 @SkipIfFS.hive
 class TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
 

Reply via email to