This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit be16a02fa8f98da09d572d8250363896ae10b9e7 Author: Riza Suminto <[email protected]> AuthorDate: Thu May 8 22:39:15 2025 -0700 IMPALA-13850 (part 2): Fix bug found by test_restart_services.py This patch stabilize test_restart_catalogd_with_local_catalog in test_restart_services.py after the first part of IMPALA-13850 merged. IMPALA-13850 (part 1) make local catalog mode send statestore update twice: the first is to announce its availability and service id, while the second is the full topic update. There is a slight duration where CatalogD accept getCatalogObject() request before the very first CatalogServiceCatalog.reset() initiated and obtain write lock. When such request went through, the request might see an empty catalog which results in query failures of db/table not exists. This patch block CatalogServiceThriftIf.AcceptRequest() until CatalogServiceCatalog.reset() initiated. Catalog version 100 is used to signal that initial reset has begun. Later in part 3, when we implement in-place metadata cache reset, AcceptRequest() can unblock faster when reset() release the write lock in-between catalog cache initialization. Testing: - Loop and pass test_restart_catalogd_with_local_catalog 100 times. Change-Id: I97f6f692506de0bbf2e1445f83bed824dc8298fd Reviewed-on: http://gerrit.cloudera.org:8080/22844 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 17 +++++++++++++++++ fe/src/main/java/org/apache/impala/catalog/Catalog.java | 3 +++ .../apache/impala/catalog/CatalogServiceCatalog.java | 7 ++++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 63d6c0587..875853e0b 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -329,6 +329,9 @@ const string HADOOP_VARZ_TEMPLATE = "hadoop-varz.tmpl"; const string HADOOP_VARZ_WEB_PAGE = "/hadoop-varz"; const int REFRESH_METRICS_INTERVAL_MS = 1000; +// Catalog version that signal that the first metadata reset has begun. +// This should match Catalog.CATALOG_VERSION_AFTER_FIRST_RESET +const int MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST = 100; // Implementation for the CatalogService thrift interface. class CatalogServiceThriftIf : public CatalogServiceIf { @@ -547,6 +550,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf { private: CatalogServer* catalog_server_; string server_address_; + bool has_initiated_first_reset_ = false; // Check if catalog protocols are compatible between client and catalog server. // Return Status::OK() if the protocols are compatible and catalog server is active. @@ -559,6 +563,19 @@ class CatalogServiceThriftIf : public CatalogServiceIf { status = Status(Substitute("Request for Catalog service is rejected since " "catalogd $0 is in standby mode", server_address_)); } + while (status.ok() && !has_initiated_first_reset_) { + long current_catalog_version = 0; + status = catalog_server_->catalog()->GetCatalogVersion(¤t_catalog_version); + if (!status.ok()) break; + if (current_catalog_version >= MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST) { + has_initiated_first_reset_ = true; + } else { + VLOG(1) << "Catalog is not initialized yet. Waiting for catalog version (" + << current_catalog_version << ") to be >= " + << MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST; + SleepForMs(100); + } + } return status; } }; diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java index cf90bc9b7..4e0655a3d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java @@ -78,6 +78,9 @@ import com.google.common.base.Preconditions; public abstract class Catalog implements AutoCloseable { // Initial catalog version and ID. public final static long INITIAL_CATALOG_VERSION = 0L; + // Catalog version that signal that the first metadata reset has begun. + // This should match MIN_CATALOG_VERSION_TO_ACCEPT_REQUEST. + public final static long CATALOG_VERSION_AFTER_FIRST_RESET = 100L; public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L); public static final String DEFAULT_DB = "default"; diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 0a7227d7f..f1373cd04 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2297,7 +2297,12 @@ public class CatalogServiceCatalog extends Catalog { // In case of an empty new catalog, the version should still change to reflect the // reset operation itself and to unblock impalads by making the catalog version > // INITIAL_CATALOG_VERSION. See Frontend.waitForCatalog() - ++catalogVersion_; + if (catalogVersion_ < Catalog.CATALOG_VERSION_AFTER_FIRST_RESET) { + catalogVersion_ = Catalog.CATALOG_VERSION_AFTER_FIRST_RESET; + LOG.info("First reset initiated. Version: " + catalogVersion_); + } else { + ++catalogVersion_; + } // Update data source, db and table metadata try {
