This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 64abca481ffefcc67cee9e8c20de51e68238be95
Author: stiga-huang <huangquanl...@gmail.com>
AuthorDate: Tue Jul 15 16:21:09 2025 +0800

    IMPALA-14227: In HA failover, passive catalogd should apply pending HMS 
events before being active
    
    After IMPALA-14074, the passive catalogd can have a warmed up metadata
    cache during failover (with catalogd_ha_reset_metadata_on_failover=false
    and a non-empty warmup_tables_config_file). However, it could still use
    a stale metadata cache when some pending HMS events generated by the
    previous active catalogd are not applied yet.
    
    This patch adds a wait during HA failover to ensure HMS events before
    the failover happens are all applied on the new active catalogd. The
    timeout is configured by a new flag which defaults to 300 (5 minutes):
    catalogd_ha_failover_catchup_timeout_s. When timeout happens, by default
    catalogd will fallback to resetting all metadata. Users can decide
    whether to reset or continue using the current cache. This is configured
    by another flag, catalogd_ha_reset_metadata_on_failover_catchup_timeout.
    
    Since the passive catalogd depends on HMS event processing to keep its
    metadata up-to-date with the active catalogd, this patch adds validation
    to avoid starting catalogd with catalogd_ha_reset_metadata_on_failover
    set to false and hms_event_polling_interval_s <= 0.
    
    This patch also makes catalogd_ha_reset_metadata_on_failover a
    non-hidden flag so it's shown in the /varz web page.
    
    Tests:
     - Ran test_warmed_up_metadata_after_failover 200 times. Without the
       fix, it usually fails in several runs.
     - Added new tests for the new flags.
    
    Change-Id: Icf4fcb0e27c14197f79625749949b47c033a5f31
    Reviewed-on: http://gerrit.cloudera.org:8080/23174
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 107 +++++++++++++++++++--
 be/src/catalog/catalog-server.h                    |   6 ++
 be/src/catalog/catalog.cc                          |   6 +-
 be/src/catalog/catalog.h                           |   3 +-
 be/src/common/init.cc                              |  10 ++
 common/thrift/JniCatalog.thrift                    |   7 ++
 .../impala/catalog/CatalogServiceCatalog.java      |   6 +-
 .../catalog/events/ExternalEventsProcessor.java    |   4 +-
 .../catalog/events/MetastoreEventsProcessor.java   |  15 ++-
 .../impala/catalog/events/NoOpEventProcessor.java  |   4 +-
 .../java/org/apache/impala/service/JniCatalog.java |  14 ++-
 .../events/MetastoreEventsProcessorTest.java       |  10 +-
 tests/custom_cluster/test_catalogd_ha.py           |  46 ++++++++-
 13 files changed, 214 insertions(+), 24 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 4b522b343..f74e1419b 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -171,11 +171,24 @@ DEFINE_int32(catalog_operation_log_size, 100, "Number of 
catalog operation log r
     "to retain in catalogd. If -1, the operation log has unbounded size.");
 
 // The standby catalogd may have stale metadata for some reason, like event 
processor
-// could have hung or could be just behind in processing events. Also the 
standby
-// catalogd doesn't get invalidate requests from coordinators so we should 
probably
+// could have hung or could be just behind in processing events. So we should 
probably
 // reset its metadata when it becomes active to avoid stale metadata.
-DEFINE_bool_hidden(catalogd_ha_reset_metadata_on_failover, true, "If true, 
reset all "
-    "metadata when the catalogd becomes active.");
+DEFINE_bool(catalogd_ha_reset_metadata_on_failover, true, "If true, reset all 
metadata "
+    "when the catalogd becomes active. If false, catalogd keeps using its 
current "
+    "metadata but will apply all pending HMS events before being active. 
Setting this "
+    "to false requires enabling HMS notification events processing, i.e. "
+    "hms_event_polling_interval_s > 0");
+
+DEFINE_int32(catalogd_ha_failover_catchup_timeout_s, 300, "When "
+  "catalogd_ha_reset_metadata_on_failover is false, catalogd wait before 
transitioning to"
+  " the active state until pending HMS events are applied. This flag controls 
the timeout"
+  " for this wait.");
+
+DEFINE_bool(catalogd_ha_reset_metadata_on_failover_catchup_timeout, true, "If 
true, "
+  "catalogd will reset all metadata when it times out in catching up HMS 
events in HA "
+  "failover. If false, catalogd ignored the pending HMS events when timeout 
happens and "
+  "continue transitioning to the active state. This is only used when "
+  "catalogd_ha_reset_metadata_on_failover is false.");
 
 DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the 
entries "
     "of the catalog topic update log are garbage collected. An entry may 
survive "
@@ -802,7 +815,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
 void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
     int64_t active_catalogd_version, const TCatalogRegistration& 
catalogd_registration) {
-  lock_guard<mutex> l(catalog_lock_);
+  unique_lock<mutex> l(catalog_lock_);
   if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
           is_registration_reply, active_catalogd_version)) {
     return;
@@ -836,6 +849,11 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
         if (!status.ok()) {
           LOG(ERROR) << "Failed to refresh data sources triggered by catalogd 
failover.";
         }
+        // If HA state has been determined, this is a failover. Apply pending 
HMS events
+        // to avoid stale metadata. Note that only HMS events before the 
failover happens
+        // need to be applied. HMS events after that are OK to be applied 
later since they
+        // are not applied in the previous active catalogd as well.
+        if (is_ha_determined_) WaitUntilHmsEventsSynced(l);
       }
       // Signal the catalog update gathering thread to start.
       topic_updates_ready_ = false;
@@ -860,6 +878,81 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
   is_ha_determined_ = true;
 }
 
+void CatalogServer::WaitUntilHmsEventsSynced(const unique_lock<std::mutex>& 
lock) {
+  DCHECK(lock.mutex() == &catalog_lock_ && lock.owns_lock());
+  uint64_t timeout_ns = FLAGS_catalogd_ha_failover_catchup_timeout_s * 
NANOS_PER_SEC;
+  MonotonicStopWatch timeout_timer;
+  timeout_timer.Start();
+  LOG(INFO) << "Getting latest HMS event event id and waiting for 
EventProcessor to "
+               "sync up";
+  TEventProcessorMetricsSummaryResponse response;
+  TEventProcessorMetricsSummaryRequest req;
+  // Fetches the latest HMS event id from HMS directly in case the cached 
value in
+  // EventProcessor is stale. It's updated every 1s (by default) but could be 
stale due
+  // to slow HMS RPCs.
+  req.get_latest_event_from_hms = true;
+  Status status = catalog_->GetEventProcessorSummary(req, &response);
+  DCHECK(status.ok());
+  if (response.__isset.error_msg) {
+    triggered_first_reset_ = false;
+    LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata in 
failover";
+    return;
+  }
+  DCHECK(response.__isset.progress);
+  int64_t latest_hms_event_id = response.progress.latest_event_id;
+  int64_t last_synced_hms_event_id = response.progress.last_synced_event_id;
+  // If there are exceptions in fetching the latest HMS event id, 
'latest_hms_event_id'
+  // is set to -1. Retry since we do need a correct value.
+  while (latest_hms_event_id < 0 && timeout_timer.ElapsedTime() < timeout_ns) {
+    SleepForMs(100);
+    status = catalog_->GetEventProcessorSummary(req, &response);
+    DCHECK(status.ok());
+    DCHECK(response.__isset.progress);
+    latest_hms_event_id = response.progress.latest_event_id;
+  }
+  if (latest_hms_event_id < 0) {
+    triggered_first_reset_ = false;
+    LOG(ERROR) << "Timed out getting the latest event id from HMS. Resetting 
all metadata"
+                  "in failover";
+    return;
+  }
+  // Now that we figure out latest_hms_event_id from HMS, we loop and wait 
until
+  // last_synced_event_id catch up with this latest_hms_event_id or timeout 
passed.
+  // Setting get_latest_event_from_hms to false so GetEventProcessorSummary 
won't send
+  // HMS RPCs.
+  req.get_latest_event_from_hms = false;
+  while (last_synced_hms_event_id < latest_hms_event_id
+      && timeout_timer.ElapsedTime() < timeout_ns) {
+    LOG(INFO) << "Wait until the pending "
+        << (latest_hms_event_id - last_synced_hms_event_id)
+        << " HMS events are applied. Latest event in HMS: id=" << 
latest_hms_event_id
+        << ". Last synced event: id=" << last_synced_hms_event_id;
+    SleepForMs(REFRESH_METRICS_INTERVAL_MS);
+    status = catalog_->GetEventProcessorSummary(req, &response);
+    DCHECK(status.ok());
+    if (response.__isset.error_msg) {
+      triggered_first_reset_ = false;
+      LOG(ERROR) << "EventProcessor is in ERROR state. Resetting all metadata 
in "
+                    "failover";
+      return;
+    }
+    DCHECK(response.__isset.progress);
+    last_synced_hms_event_id = response.progress.last_synced_event_id;
+  }
+  if (last_synced_hms_event_id < latest_hms_event_id) {
+    LOG(WARNING) << "Timed out waiting for catching up HMS events.";
+    if (FLAGS_catalogd_ha_reset_metadata_on_failover_catchup_timeout) {
+      triggered_first_reset_ = false;
+      LOG(INFO) << "Fallback to resetting all metadata.";
+    } else {
+      LOG(WARNING) << "Continue with the current cache. Metadata might be 
stale.";
+    }
+    return;
+  }
+  LOG(INFO) << "EventProcessor is synced up with HMS event id during failover: 
"
+      << latest_hms_event_id;
+}
+
 [[noreturn]] void CatalogServer::TriggerResetMetadata() {
   while (true) {
     bool must_reset = false;
@@ -1173,8 +1266,10 @@ void CatalogServer::GetCatalogUsage(Document* document) {
 void CatalogServer::EventMetricsUrlCallback(
     const Webserver::WebRequest& req, Document* document) {
   auto& allocator = document->GetAllocator();
+  TEventProcessorMetricsSummaryRequest request;
   TEventProcessorMetricsSummaryResponse event_processor_summary_response;
-  Status status = 
catalog_->GetEventProcessorSummary(&event_processor_summary_response);
+  Status status = catalog_->GetEventProcessorSummary(
+      request, &event_processor_summary_response);
   if (!status.ok()) {
     Value error(status.GetDetail(), allocator);
     document->AddMember("error", error, allocator);
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 786fc00d2..5a01d7966 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -289,6 +289,12 @@ class CatalogServer {
   void UpdateActiveCatalogd(bool is_registration_reply, int64_t 
active_catalogd_version,
       const TCatalogRegistration& catalogd_registration);
 
+  /// Wait until the pending HMS events are applied. Used in HA failover 
before the
+  /// standby catalogd becomes active to get rid of stale metadata when
+  /// catalogd_ha_reset_metadata_on_failover is set to false. Callers should 
hold the
+  /// catalog_lock_.
+  void WaitUntilHmsEventsSynced(const std::unique_lock<std::mutex>& lock);
+
   /// Returns the current active status of the catalogd.
   bool IsActive();
 
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 752aa6833..c6af237ec 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -70,7 +70,7 @@ Catalog::Catalog() {
     {"getOperationUsage", "()[B", &get_operation_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
-    {"getEventProcessorSummary", "()[B", &get_event_processor_summary_},
+    {"getEventProcessorSummary", "([B)[B", &get_event_processor_summary_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_},
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
@@ -191,9 +191,9 @@ Status 
Catalog::GetOperationUsage(TGetOperationUsageResponse* response) {
   return JniUtil::CallJniMethod(catalog_, get_operation_usage_id_, response);
 }
 
-Status Catalog::GetEventProcessorSummary(
+Status Catalog::GetEventProcessorSummary(const 
TEventProcessorMetricsSummaryRequest& req,
     TEventProcessorMetricsSummaryResponse* response) {
-  return JniUtil::CallJniMethod(catalog_, get_event_processor_summary_, 
response);
+  return JniUtil::CallJniMethod(catalog_, get_event_processor_summary_, req, 
response);
 }
 
 Status Catalog::GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* 
response) {
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 17b95abb0..47fb8d619 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -118,7 +118,8 @@ class Catalog {
 
   /// Returns the metastore event processor summary view. The summary string
   /// in the response can contain detailed metrics along with status
-  Status GetEventProcessorSummary(TEventProcessorMetricsSummaryResponse* 
response);
+  Status GetEventProcessorSummary(const TEventProcessorMetricsSummaryRequest& 
req,
+      TEventProcessorMetricsSummaryResponse* response);
 
   /// Gets all functions in the catalog matching the parameters in the given
   /// TFunctionsRequest.
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index a72f7276d..460ed3cea 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -85,6 +85,8 @@ DECLARE_bool(symbolize_stacktrace);
 DECLARE_string(debug_actions);
 DECLARE_int64(thrift_rpc_max_message_size);
 DECLARE_int64(thrift_external_rpc_max_message_size);
+DECLARE_double(hms_event_polling_interval_s);
+DECLARE_bool(catalogd_ha_reset_metadata_on_failover);
 
 DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in 
milliseconds "
     "between memory maintenance iterations");
@@ -562,6 +564,14 @@ void impala::InitCommonRuntime(int argc, char** argv, bool 
init_jvm,
             FLAGS_thrift_external_rpc_max_message_size, 
ThriftDefaultMaxMessageSize()));
   }
 
+  if (!FLAGS_catalogd_ha_reset_metadata_on_failover
+      && FLAGS_hms_event_polling_interval_s <= 0) {
+    CLEAN_EXIT_WITH_ERROR(Substitute(
+        "Invalid hms_event_polling_interval_s: $0. It should be larger than 0 
when "
+        "--catalogd_ha_reset_metadata_on_failover is false",
+        FLAGS_hms_event_polling_interval_s));
+  }
+
   impala::InitGoogleLoggingSafe(argv[0]);
   // Breakpad needs flags and logging to initialize.
   if (!external_fe) {
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index ebb699f49..3ab6e3c84 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -1083,6 +1083,13 @@ struct TCopyTestCaseReq {
   1: required string input_path
 }
 
+struct TEventProcessorMetricsSummaryRequest {
+  // Whether to fetch the latest HMS event id using a HMS RPC or using the 
cached value
+  // in the EventProcessor. When set to true, 'latest_event_time_s' in the 
progress info
+  // will be -1 to save a HMS RPC.
+  1: required bool get_latest_event_from_hms = false
+}
+
 struct TEventBatchProgressInfo {
   // Number of original HMS events received in the current batch.
   1: required i32 num_hms_events
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 731da77cc..73f89ddff 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -116,6 +116,7 @@ import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TEventProcessorCmdParams;
 import org.apache.impala.thrift.TEventProcessorMetrics;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetCatalogUsageResponse;
@@ -4057,8 +4058,9 @@ public class CatalogServiceCatalog extends Catalog {
    * Gets the events processor summary. Used for populating the contents of 
the events
    * processor detailed view page
    */
-  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
-    return metastoreEventProcessor_.getEventProcessorSummary();
+  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
+      TEventProcessorMetricsSummaryRequest req) {
+    return metastoreEventProcessor_.getEventProcessorSummary(req);
   }
 
   public TSetEventProcessorStatusResponse setEventProcessorStatus(
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
index e48b7a2f7..fbe37db20 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
@@ -21,6 +21,7 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TEventProcessorMetrics;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.kudu.client.Delete;
 
@@ -83,7 +84,8 @@ public interface ExternalEventsProcessor {
    * Gets a detailed view of the event processor which can be used to populate 
the
    * content of a dedicated page for the event processor
    */
-  TEventProcessorMetricsSummaryResponse getEventProcessorSummary();
+  TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
+      TEventProcessorMetricsSummaryRequest req);
 
   /**
    * Gets the {@link MetastoreEventFactory} to be used for creating
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 59dd859df..2186afcba 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -87,6 +87,7 @@ import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TEventBatchProgressInfo;
 import org.apache.impala.thrift.TEventProcessorMetrics;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.impala.thrift.TImpalaTableType;
 import org.apache.impala.thrift.TStatus;
@@ -1407,7 +1408,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   }
 
   @Override
-  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
+  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
+      TEventProcessorMetricsSummaryRequest req) {
     TEventProcessorMetricsSummaryResponse summaryResponse =
         new TEventProcessorMetricsSummaryResponse();
     summaryResponse.setSummary(metrics_.toString());
@@ -1419,6 +1421,17 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     progressInfo.last_synced_event_time_s = lastSyncedEventTimeSecs_.get();
     progressInfo.latest_event_id = latestEventId_.get();
     progressInfo.latest_event_time_s = latestEventTimeSecs_.get();
+    if (req.get_latest_event_from_hms) {
+      try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+        progressInfo.latest_event_id =
+            
metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+        progressInfo.latest_event_time_s = -1;
+      } catch (MetastoreClientInstantiationException | TException e) {
+        progressInfo.latest_event_id = - 1;
+        progressInfo.latest_event_time_s = -1;
+        LOG.warn("Failed to fetch the latest HMS event id. Returning -1", e);
+      }
+    }
     // Assign these lists to local variables in case they are replaced 
concurrently.
     // It's best effort to make the members in 'progressInfo' consistent but 
we can't
     // guarantee it.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java 
b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
index c5eb5ff7f..986c01429 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/NoOpEventProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.thrift.TEventProcessorMetrics;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 
 /**
@@ -98,7 +99,8 @@ public class NoOpEventProcessor implements 
ExternalEventsProcessor {
   }
 
   @Override
-  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary() {
+  public TEventProcessorMetricsSummaryResponse getEventProcessorSummary(
+      TEventProcessorMetricsSummaryRequest req) {
     return DEFAULT_SUMMARY_RESPONSE;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index ec145ef86..8248f2788 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -64,6 +64,7 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TGetCatalogDeltaRequest;
 import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
@@ -517,10 +518,19 @@ public class JniCatalog {
     return execAndSerialize("getOperationUsage", shortDesc, 
catalog_::getOperationUsage);
   }
 
-  public byte[] getEventProcessorSummary() throws ImpalaException, TException {
+  public byte[] getEventProcessorSummary(byte[] req)
+      throws ImpalaException, TException {
+    TEventProcessorMetricsSummaryRequest thriftReq =
+        new TEventProcessorMetricsSummaryRequest();
+    JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
+
     String shortDesc = "Getting event processor summary";
+    if (thriftReq.get_latest_event_from_hms) {
+      shortDesc += " (get_latest_event_from_hms=true)";
+    }
     return execAndSerialize(
-        "getEventProcessorSummary", shortDesc, 
catalog_::getEventProcessorSummary);
+        "getEventProcessorSummary", shortDesc,
+        () -> catalog_.getEventProcessorSummary(thriftReq));
   }
 
   public byte[] setEventProcessorStatus(byte[] thriftParams)
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 091a2bfaa..a409a6630 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -146,6 +146,7 @@ import org.apache.impala.thrift.TDropDbParams;
 import org.apache.impala.thrift.TDropFunctionParams;
 import org.apache.impala.thrift.TDropTableOrViewParams;
 import org.apache.impala.thrift.TEventProcessorMetrics;
+import org.apache.impala.thrift.TEventProcessorMetricsSummaryRequest;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.THdfsFileFormat;
@@ -1960,7 +1961,7 @@ public class MetastoreEventsProcessorTest {
     assertTrue("Event process duration should be greater than zero",
         response.getEvents_process_duration_mean() > 0);
     TEventProcessorMetricsSummaryResponse summaryResponse =
-        catalog_.getEventProcessorSummary();
+        catalog_.getEventProcessorSummary(new 
TEventProcessorMetricsSummaryRequest());
     assertNotNull(summaryResponse);
     assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
   }
@@ -1991,7 +1992,7 @@ public class MetastoreEventsProcessorTest {
     assertTrue("we do not turn off disableHmsSync for table testTblName1",
             response.getEvents_skipped() >= numEventsSkippedBefore);
     TEventProcessorMetricsSummaryResponse summaryResponse =
-            catalog_.getEventProcessorSummary();
+            catalog_.getEventProcessorSummary(new 
TEventProcessorMetricsSummaryRequest());
     assertNotNull(summaryResponse);
     assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
 
@@ -2052,7 +2053,8 @@ public class MetastoreEventsProcessorTest {
       assertFalse(response.isSetEvents_received_5min_rate());
       assertFalse(response.isSetEvents_received_15min_rate());
       TEventProcessorMetricsSummaryResponse summaryResponse =
-          eventsProcessor_.getEventProcessorSummary();
+          eventsProcessor_.getEventProcessorSummary(
+              new TEventProcessorMetricsSummaryRequest());
       assertNotNull(summaryResponse);
       // Last synced id must be set even when event processor is not active.
       assertTrue(response.isSetLast_synced_event_id());
@@ -2075,7 +2077,7 @@ public class MetastoreEventsProcessorTest {
     assertNotNull(response);
     assertEquals(EventProcessorStatus.DISABLED.toString(), 
response.getStatus());
     TEventProcessorMetricsSummaryResponse summaryResponse =
-        testCatalog.getEventProcessorSummary();
+        testCatalog.getEventProcessorSummary(new 
TEventProcessorMetricsSummaryRequest());
     assertNotNull(summaryResponse);
   }
   /**
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index beda5569d..289d8ec4f 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -530,6 +530,7 @@ class TestCatalogdHA(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@2000 "
                   "--warmup_tables_config_file="
                   
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
     start_args="--enable_catalogd_ha")
@@ -537,9 +538,40 @@ class TestCatalogdHA(CustomClusterTestSuite):
     """Verify that the metadata is warmed up in the standby catalogd."""
     for catalogd in self.__get_catalogds():
       self._test_warmed_up_tables(catalogd.service)
-    latest_catalogd = self._test_metadata_after_failover(unique_database, True)
+    latest_catalogd = self._test_metadata_after_failover(
+        unique_database, skip_func_test=True)
     self._test_warmed_up_tables(latest_catalogd)
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
+                  "--catalogd_ha_failover_catchup_timeout_s=2 "
+                  "--warmup_tables_config_file="
+                  
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
+    start_args="--enable_catalogd_ha")
+  def test_failover_catchup_timeout_and_reset(self, unique_database):
+    self._test_metadata_after_failover(unique_database, skip_func_test=True)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@3000 "
+                  "--catalogd_ha_failover_catchup_timeout_s=2 "
+                  
"--catalogd_ha_reset_metadata_on_failover_catchup_timeout=false "
+                  "--warmup_tables_config_file="
+                  
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
+    start_args="--enable_catalogd_ha")
+  def test_failover_catchup_timeout_not_reset(self, unique_database):
+    # Use allow_table_not_exists=True since the table is missing due to 
catalog not reset.
+    latest_catalogd = self._test_metadata_after_failover(
+        unique_database, allow_table_not_exists=True, skip_func_test=True)
+    # Verify tables are still loaded
+    self._test_warmed_up_tables(latest_catalogd)
+    # Run a global IM to bring up 'unique_database' in the new catalogd. 
Otherwise, the
+    # cleanup_database step will fail.
+    self.execute_query("invalidate metadata")
+
   def _test_warmed_up_tables(self, catalogd):
     db = "tpcds"
     tables = ["customer", "date_dim", "item", "store_sales"]
@@ -547,7 +579,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
       catalogd.verify_table_metadata_loaded(db, table)
     catalogd.verify_table_metadata_loaded(db, "store", expect_loaded=False)
 
-  def _test_metadata_after_failover(self, unique_database, 
skip_func_test=False):
+  def _test_metadata_after_failover(self, unique_database,
+                                    allow_table_not_exists=False, 
skip_func_test=False):
     """Verify that the metadata is correct after failover. Returns the current 
active
     catalogd"""
     (active_catalogd, standby_catalogd) = self.__get_catalogds()
@@ -581,7 +614,14 @@ class TestCatalogdHA(CustomClusterTestSuite):
       self.execute_query_expect_success(
           self.client, "select %s.identity_tmp(10)" % unique_database)
 
-    self.execute_query_expect_success(self.client, "describe %s.tbl" % 
unique_database)
+    # Check if the new active catalogd has the new table in its cache.
+    try:
+      self.execute_query("describe %s.tbl" % unique_database)
+    except Exception as e:
+      if not allow_table_not_exists:
+        # Due to IMPALA-14228, the query could still fail. But it's not due to 
stale
+        # metadata so allow this until we resolve IMPALA-14228.
+        assert "Error making an RPC call to Catalog server" in str(e)
     return catalogd_service_2
 
   def test_page_with_disable_ha(self):

Reply via email to