This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 384a664f54f6fa32dc39d3368ad19fda225bff01 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Mon Jun 9 11:47:54 2025 -0700 IMPALA-14107: Increment table version in fireReloadEvent test_reload_events_with_transient_partitions can get stuck when running in local catalog mode. From the catalod.INFO, it looks like catalog keep looking for non-existent partition id. In local catalog mode, CatalogLookupStatus.PARTITION_NOT_FOUND response from CatalogD will be followed by Coordinator invalidating table metadata cache and table list of database. However, it does not invalidate the partition list of requested table. This cause Coordinator to re-request the same partition id that is not exist anymore in CatalogD. This patch attempt to fix the issue from CatalogD side by incrementing table version in fireReloadEventAndUpdateRefreshEventId if HdfsTable.updatePartition() do detect any partition change. CatalogD will send new topic update that reset Coordinator cache, and Coordinator will retry the same partition request with an updated partition ID. Reworded the InconsistentMetadataFetchException message to clarify the lookup_status failure vs the req.object_desc. This patch also tweak HdfsTable.getPartialInfo() to directly gather all partitions when Coordinator does not request for specific partitions. Testing: Split test_reload_events_with_transient_partitions into two setup: legacy catalog mode and local catalog mode. Confirm that both tests pass. Change-Id: I1bc91023d46a0b57916202d2b64e2cf07b687b74 Reviewed-on: http://gerrit.cloudera.org:8080/22956 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../java/org/apache/impala/catalog/HdfsTable.java | 49 ++++++++++++++-------- .../impala/catalog/local/CatalogdMetaProvider.java | 4 +- .../apache/impala/service/CatalogOpExecutor.java | 15 ++++++- tests/custom_cluster/test_events_custom_configs.py | 23 +++++++--- 4 files changed, 65 insertions(+), 26 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 06bbcb3ca..7f6cf7f85 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -1026,12 +1026,15 @@ public class HdfsTable extends Table implements FeFsTable { } } - public void updatePartitions(List<HdfsPartition.Builder> partBuilders) + public boolean updatePartitions(List<HdfsPartition.Builder> partBuilders) throws CatalogException { - for (HdfsPartition.Builder p : partBuilders) updatePartition(p); + boolean partitionChanged = false; + for (HdfsPartition.Builder p : partBuilders) partitionChanged |= updatePartition(p); + return partitionChanged; } - public void updatePartition(HdfsPartition.Builder partBuilder) throws CatalogException { + public boolean updatePartition(HdfsPartition.Builder partBuilder) + throws CatalogException { HdfsPartition oldPartition = partBuilder.getOldInstance(); Preconditions.checkNotNull(oldPartition, "Old partition instance should exist for updates"); @@ -1042,11 +1045,12 @@ public class HdfsTable extends Table implements FeFsTable { boolean partitionNotChanged = partBuilder.equalsToOriginal(oldPartition); LOG.trace("Partition {} {}", oldPartition.getName(), partitionNotChanged ? "changed" : "unchanged"); - if (partitionNotChanged) return; + if (partitionNotChanged) return false; HdfsPartition newPartition = partBuilder.build(); // Partition is reloaded and hence cache directives are not dropped. dropPartition(oldPartition, false); addPartition(newPartition); + return true; } /** @@ -2290,10 +2294,25 @@ public class HdfsTable extends Table implements FeFsTable { } Collection<Long> partIds = req.table_info_selector.partition_ids; - if (partIds == null && wantPartitionInfo) { - // Caller specified at least one piece of partition info but didn't specify - // any partition IDs. That means they want the info for all partitions. - partIds = partitionMap_.keySet(); + Collection<HdfsPartition> requestedPartitions = null; + if (wantPartitionInfo) { + if (partIds == null) { + // Caller specified at least one piece of partition info but didn't specify + // any partition IDs. That means they want the info for all partitions. + requestedPartitions = partitionMap_.values(); + } else { + requestedPartitions = Lists.newArrayListWithCapacity(partIds.size()); + for (long partId : partIds) { + HdfsPartition part = partitionMap_.get(partId); + if (part == null) { + LOG.warn(String.format( + "Missing partition ID: %s, Table: %s", partId, getFullName())); + return new TGetPartialCatalogObjectResponse().setLookup_status( + CatalogLookupStatus.PARTITION_NOT_FOUND); + } + requestedPartitions.add(part); + } + } } ValidWriteIdList reqWriteIdList = req.table_info_selector.valid_write_ids == null ? @@ -2303,16 +2322,10 @@ public class HdfsTable extends Table implements FeFsTable { Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC); int numFilesFiltered = 0; int numFilesCollected = 0; - if (partIds != null) { - resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size()); - for (long partId : partIds) { - HdfsPartition part = partitionMap_.get(partId); - if (part == null) { - LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId, - getFullName())); - return new TGetPartialCatalogObjectResponse().setLookup_status( - CatalogLookupStatus.PARTITION_NOT_FOUND); - } + if (requestedPartitions != null) { + resp.table_info.partitions = + Lists.newArrayListWithCapacity(requestedPartitions.size()); + for (HdfsPartition part : requestedPartitions) { Pair<TPartialPartitionInfo, Integer> partInfoStatus = part.getPartialPartitionInfo(req, reqWriteIdList); if (partInfoStatus.second != null) { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index fa40fdddc..89779d4b0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -485,8 +485,8 @@ public class CatalogdMetaProvider implements MetaProvider { case DATA_SOURCE_NOT_FOUND: invalidateCacheForObject(req.object_desc); throw new InconsistentMetadataFetchException(resp.lookup_status, - String.format("Fetching %s failed: %s. Could not find %s", - req.object_desc.type, resp.lookup_status, req.object_desc)); + String.format("Fetching %s failed: %s for %s", req.object_desc.type, + resp.lookup_status, req.object_desc)); default: break; } Preconditions.checkState(resp.lookup_status == CatalogLookupStatus.OK); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 1534f271a..d83070c35 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -7243,6 +7243,12 @@ public class CatalogOpExecutor { } addDbToCatalogUpdate(addedDb, req.header.want_minimal_response, resp.getResult()); } + // Here, we are sending an old snapshot if the table is modified again + // in fireReloadEventAndUpdateRefreshEventId(). This is OK since the old snapshot + // works as the new one in coordinator side. To be specific: + // - in local catalog mode, only the table name is used to evict stale cache items. + // - in the legacy catalog mode, the old snapshot has the same partition metadata + // and file metadata. resp.getResult().setVersion(updatedThriftTable.getCatalog_version()); } else if (req.isAuthorization()) { AuthorizationDelta authzDelta = catalog_.refreshAuthorization(false); @@ -7305,7 +7311,10 @@ public class CatalogOpExecutor { tbl.getFullName()); return; } + + // tbl lock is held at this point. if (partSpecList != null) { + boolean partitionChanged = false; for (int i = 0; i < partSpecList.size(); ++i) { HdfsTable hdfsTbl = (HdfsTable) tbl; HdfsPartition partition = hdfsTbl @@ -7313,7 +7322,7 @@ public class CatalogOpExecutor { if (partition != null) { HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); partBuilder.setLastRefreshEventId(eventIds.get(0)); - hdfsTbl.updatePartition(partBuilder); + partitionChanged |= hdfsTbl.updatePartition(partBuilder); } else { LOG.warn("Partition {} no longer exists in table {}. It might be " + "dropped by a concurrent operation.", @@ -7321,6 +7330,10 @@ public class CatalogOpExecutor { hdfsTbl.getFullName()); } } + if (partitionChanged) { + // Set catalog version of the table to a new one. + tbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion()); + } } else { tbl.setLastRefreshEventId(eventIds.get(0)); } diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 6897424d1..bcf86c9fc 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -658,8 +658,18 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" @CustomClusterTestSuite.with_args( - catalogd_args="--enable_reload_events=true") - def test_reload_events_with_transient_partitions(self, unique_database): + impalad_args="--use_local_catalog=false", + catalogd_args="--catalog_topic_mode=full --enable_reload_events=true") + def test_reload_events_with_transient_partitions_legacy_catalog(self, unique_database): + self.run_test_reload_events_with_transient_partitions(unique_database) + + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --enable_reload_events=true") + def test_reload_events_with_transient_partitions_local_catalog(self, unique_database): + self.run_test_reload_events_with_transient_partitions(unique_database) + + def run_test_reload_events_with_transient_partitions(self, unique_database): tbl = unique_database + ".tbl" create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl) add_part_stmt = "alter table {} add if not exists partition(p=0)".format(tbl) @@ -676,9 +686,10 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): self.execute_query(drop_part_stmt) refresh_state = self.client.wait_for_any_impala_state( refresh_handle, end_states, 10) - assert refresh_state == FINISHED, \ - "REFRESH state: {}. Error log: {}".format( - refresh_state, self.client.get_log(refresh_handle)) + is_finished = (refresh_state == FINISHED) + error_log = None if is_finished else self.client.get_log(refresh_handle) + self.client.close_query(refresh_handle) + assert is_finished, "REFRESH is error. Error log: {}".format(error_log) self.execute_query(add_part_stmt) refresh_handle = self.client.execute_async(refresh_stmt) @@ -1630,6 +1641,7 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): assert curr_drop_table_metric == prev_drop_table_metric + 1 assert curr_create_table_metric == prev_create_table_metric + 1 + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase): """This class contains tests that exercise the event processing mechanism in the @@ -1663,6 +1675,7 @@ class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase): self._run_self_events_test(unique_database, vector.get_value('exec_option'), use_impala=True) + @SkipIfFS.hive class TestEventSyncFailures(TestEventProcessingCustomConfigsBase):