This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dfe47c639a53fb10feffc596d10ea369e1212c9b
Author: wzhou-code <[email protected]>
AuthorDate: Fri Jun 23 15:59:05 2023 -0700

    IMPALA-12235: Fixed start-impala-cluster failure
    
    IMPALA-12150 added statestore_id in the structures of requests for
    StatestoreSubscriber service, and check statestore_id when handling
    requests from statestore. But it's possible the topic update messages
    are received before receiving the registration response. In the case,
    statestore_id is not received yet when handling topic update messages.
    We should handle the incoming messages instead of skipping those
    messages. Otherwise, subscibers may lose cluster membership messages
    during starting impala cluster.
    
    This patch also changes the default value of starting flag variable
    tolerate_statestore_startup_delay as true.
    
    Testing:
     - Passed the core-tests.
    
    Change-Id: Ifa5fe5e644b8c1662e3cc77aa724ed2690f83ae6
    Reviewed-on: http://gerrit.cloudera.org:8080/20126
    Reviewed-by: Andrew Sherman <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/common/global-flags.cc              |  2 +-
 be/src/statestore/statestore-subscriber.cc | 15 +++++++++++++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0b1515f4f..0c8d99e23 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -378,7 +378,7 @@ DEFINE_bool(pull_table_types_and_comments, false,
     "catalogd-only flag. Required if users want GET_TABLES requests return 
correct table "
     "types or comments.");
 
-DEFINE_bool(tolerate_statestore_startup_delay, false, "If set to true, the 
subscriber "
+DEFINE_bool(tolerate_statestore_startup_delay, true, "If set to true, the 
subscriber "
     "is able to tolerate the delay of the statestore's availability. The 
subscriber's "
     "process will not exit if it cannot register with the specified statestore 
on "
     "startup. But instead it enters into Recovery mode, it will loop, sleep 
and retry "
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index 909914e6a..134c614f0 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -268,6 +268,8 @@ void StatestoreSubscriber::Heartbeat(
   // It's possible the heartbeat is received for previous registration.
   if (statestore_->IsMatchingStatestoreId(statestore_id)) {
     statestore_->Heartbeat(registration_id);
+  } else {
+    VLOG(3) << "Ignore heartbeat message from unknown statestored: " << 
statestore_id;
   }
 }
 
@@ -277,6 +279,9 @@ void StatestoreSubscriber::UpdateCatalogd(
     const TUniqueId& statestore_id, int64 sequence) {
   if (statestore_->IsMatchingStatestoreId(statestore_id)) {
     statestore_->UpdateCatalogd(catalogd_registration, registration_id, 
sequence);
+  } else {
+    VLOG(3) << "Ignore updating catalogd message from unknown statestored: "
+            << statestore_id;
   }
 }
 
@@ -286,6 +291,8 @@ Status StatestoreSubscriber::UpdateState(const 
TopicDeltaMap& incoming_topic_del
   if (statestore_->IsMatchingStatestoreId(statestore_id)) {
     return statestore_->UpdateState(
         incoming_topic_deltas, registration_id, subscriber_topic_updates, 
skipped);
+  } else {
+    VLOG(3) << "Ignore topic update message from unknown statestored: " << 
statestore_id;
   }
   return Status::OK();
 }
@@ -483,7 +490,7 @@ Status StatestoreSubscriber::StatestoreStub::Start(bool* 
has_active_catalogd,
     } else {
       LOG(INFO) << "statestore registration unsuccessful on startup: "
                 << status.GetDetail();
-      if (FLAGS_tolerate_statestore_startup_delay) {
+      if (FLAGS_tolerate_statestore_startup_delay && !TestInfo::is_be_test()) {
         LOG(INFO) << "Tolerate the delay of the statestore's availability on 
startup";
         status = Status::OK();
       }
@@ -612,7 +619,11 @@ Status 
StatestoreSubscriber::StatestoreStub::CheckRegistrationIdAndUpdateCatalog
 bool StatestoreSubscriber::StatestoreStub::IsMatchingStatestoreId(
     const TUniqueId statestore_id) {
   lock_guard<mutex> r(id_lock_);
-  return statestore_id == statestore_id_;
+  // It's possible the topic update messages are received before receiving the
+  // registration response. In the case, statestore_id_ and is_registered_ are 
not set.
+  // TODO: need to revisit this when supporting statestored HA.
+  return statestore_id == statestore_id_ ||
+      (!is_registered_ && statestore_id_.hi == 0 && statestore_id_.lo == 0);
 }
 
 void StatestoreSubscriber::StatestoreStub::Heartbeat(

Reply via email to