This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 28bc2bdb3 IMPALA-12849: Standby catalogd should reject requests from 
coordinators
28bc2bdb3 is described below

commit 28bc2bdb32ce8e9cd08e4ae21a8a067a513c714c
Author: wzhou-code <[email protected]>
AuthorDate: Tue Feb 27 17:21:25 2024 -0800

    IMPALA-12849: Standby catalogd should reject requests from coordinators
    
    In a catalog HA enabled cluster, it's possible that the standby catalogd
    could receive requests from coordinators in a short window after catalog
    fail-over is triggered since the coordinators may receive the fail-over
    notification from statestore with delayed time. In this scenarios,
    the standby catalogd should reject all requests from coordinators so
    that only one catalogd serving catalog service for the cluster.
    
    This patch checks if the catalog server is active when handling request
    from clients.
    
    Testing:
     - Added end-to-end unit-test.
     - Passed core tests.
    
    Change-Id: Iea38bdf4f207af657e71670a572efc7c0a0ba807
    Reviewed-on: http://gerrit.cloudera.org:8080/21086
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc         | 32 ++++++++++++++++++++------------
 be/src/catalog/catalog-server.h          |  3 +++
 be/src/runtime/exec-env.cc               |  3 +++
 tests/custom_cluster/test_catalogd_ha.py | 32 ++++++++++++++++++++++++++++++++
 4 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 529d9523c..7d53c2423 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -227,12 +227,14 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
  public:
   CatalogServiceThriftIf(CatalogServer* catalog_server)
       : catalog_server_(catalog_server) {
+    server_address_ = TNetworkAddressToString(
+        MakeNetworkAddress(FLAGS_hostname, FLAGS_catalog_service_port));
   }
 
   // Executes a TDdlExecRequest and returns details on the result of the 
operation.
   void ExecDdl(TDdlExecResponse& resp, const TDdlExecRequest& req) override {
     VLOG_RPC << "ExecDdl(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->ExecDdl(req, &resp);
     }
@@ -248,7 +250,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
       override {
     VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
     DebugActionNoFail(FLAGS_debug_actions, "RESET_METADATA_DELAY");
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->ResetMetadata(req, &resp);
     }
@@ -264,7 +266,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void UpdateCatalog(TUpdateCatalogResponse& resp, const 
TUpdateCatalogRequest& req)
       override {
     VLOG_RPC << "UpdateCatalog(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
     }
@@ -280,7 +282,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void GetFunctions(TGetFunctionsResponse& resp, const TGetFunctionsRequest& 
req)
       override {
     VLOG_RPC << "GetFunctions(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetFunctions(req, &resp);
     }
@@ -295,7 +297,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void GetCatalogObject(TGetCatalogObjectResponse& resp,
       const TGetCatalogObjectRequest& req) override {
     VLOG_RPC << "GetCatalogObject(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetCatalogObject(
           req.object_desc, &resp.catalog_object);
@@ -317,7 +319,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     // so a heavy query workload against a table undergoing a slow refresh 
doesn't
     // end up taking down the catalog by creating thousands of threads.
     VLOG_RPC << "GetPartialCatalogObject(): request=" << 
ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
     }
@@ -331,7 +333,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void GetPartitionStats(TGetPartitionStatsResponse& resp,
       const TGetPartitionStatsRequest& req) override {
     VLOG_RPC << "GetPartitionStats(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetPartitionStats(req, &resp);
     }
@@ -348,7 +350,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void PrioritizeLoad(TPrioritizeLoadResponse& resp, const 
TPrioritizeLoadRequest& req)
       override {
     VLOG_RPC << "PrioritizeLoad(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->PrioritizeLoad(req);
     }
@@ -362,7 +364,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void UpdateTableUsage(TUpdateTableUsageResponse& resp,
       const TUpdateTableUsageRequest& req) override {
     VLOG_RPC << "UpdateTableUsage(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->UpdateTableUsage(req);
     }
@@ -376,7 +378,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void GetNullPartitionName(TGetNullPartitionNameResponse& resp,
       const TGetNullPartitionNameRequest& req) override {
     VLOG_RPC << "GetNullPartitionName(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetNullPartitionName(&resp);
     }
@@ -390,7 +392,7 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
   void GetLatestCompactions(TGetLatestCompactionsResponse& resp,
       const TGetLatestCompactionsRequest& req) override {
     VLOG_RPC << "GetLatestCompactions(): request=" << ThriftDebugString(req);
-    Status status = CheckProtocolVersion(req.protocol_version);
+    Status status = AcceptRequest(req.protocol_version);
     if (status.ok()) {
       status = catalog_server_->catalog()->GetLatestCompactions(req, &resp);
     }
@@ -403,12 +405,18 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
 
  private:
   CatalogServer* catalog_server_;
+  string server_address_;
 
-  Status CheckProtocolVersion(CatalogServiceVersion::type client_version) {
+  // Check if catalog protocols are compatible between client and catalog 
server.
+  // Return Status::OK() if the protocols are compatible and catalog server is 
active.
+  Status AcceptRequest(CatalogServiceVersion::type client_version) {
     Status status = Status::OK();
     if (client_version < catalog_server_->GetProtocolVersion()) {
       status = Status(TErrorCode::CATALOG_INCOMPATIBLE_PROTOCOL, 
client_version + 1,
           catalog_server_->GetProtocolVersion() + 1);
+    } else if (FLAGS_enable_catalogd_ha && !catalog_server_->IsActive()) {
+      status = Status(Substitute("Request for Catalog service is rejected 
since "
+          "catalogd $0 is in standby mode", server_address_));
     }
     return status;
   }
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 5b3f527da..7ffab50b0 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -40,6 +40,7 @@ namespace impala {
 
 class ActiveCatalogdVersionChecker;
 class Catalog;
+class CatalogServiceThriftIf;
 class StatestoreSubscriber;
 
 /// The Impala CatalogServer manages the caching and persistence of 
cluster-wide metadata.
@@ -129,6 +130,8 @@ class CatalogServer {
   }
 
  private:
+  friend class CatalogServiceThriftIf;
+
   /// Protocol version of the Catalog Service.
   CatalogServiceVersion::type protocol_version_;
 
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index c31bd597f..d0430dd4c 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -148,6 +148,7 @@ DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
 DECLARE_int32(state_store_2_port);
 
+DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
 
 DEFINE_int32(backend_client_connection_num_retries, 3, "Retry backend 
connections.");
@@ -712,6 +713,8 @@ void ExecEnv::UpdateActiveCatalogd(bool 
is_registration_reply,
   bool is_matching = (catalogd_registration.address.port == 
catalogd_address_->port
       && catalogd_registration.address.hostname == 
catalogd_address_->hostname);
   if (!is_matching) {
+    RETURN_VOID_IF_ERROR(
+        DebugAction(FLAGS_debug_actions, "IGNORE_NEW_ACTIVE_CATALOGD_ADDR"));
     LOG(INFO) << "The address of Catalog service is changed from "
               << TNetworkAddressToString(*catalogd_address_.get())
               << " to " << 
TNetworkAddressToString(catalogd_registration.address);
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index bf812db5f..244b8a58f 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -317,6 +317,38 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(successful_update_catalogd_rpc_num >= 10)
     assert(failed_update_catalogd_rpc_num == 
successful_update_catalogd_rpc_num)
 
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    impalad_args="--debug_actions=IGNORE_NEW_ACTIVE_CATALOGD_ADDR:[email protected]",
+    start_args="--enable_catalogd_ha")
+  def test_manual_failover_with_coord_ignore_notification(self):
+    """Tests for Catalog Service manual failover with coordinators to ignore 
failover
+    notification."""
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    # Restart standby catalogd with force_catalogd_active as true.
+    catalogds[1].kill()
+    catalogds[1].start(wait_until_ready=True,
+                       additional_args="--force_catalogd_active=true")
+    # Wait until original active catalogd becomes in-active.
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_1.wait_for_metric_value(
+        "catalog-server.active-status", expected_value=False, timeout=15)
+    assert(not 
catalogd_service_1.get_metric_value("catalog-server.active-status"))
+
+    # Run query to create a table. Coordinator still send request to 
catalogd_service_1
+    # so that the request will be rejected.
+    ddl_query = "CREATE TABLE coordinator_ignore_notification (c int)"
+    ex = self.execute_query_expect_failure(self.client, ddl_query)
+    assert "Request for Catalog service is rejected" in str(ex)
+
   @CustomClusterTestSuite.with_args(
     statestored_args="--use_subscriber_id_as_catalogd_priority=true",
     start_args="--enable_catalogd_ha")

Reply via email to