This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 7403d10a5 IMPALA-12550: Fix flaky test
test_statestored_auto_failover_with_disabling_network
7403d10a5 is described below
commit 7403d10a55397f81784f369aa501b9c072d198b6
Author: wzhou-code <[email protected]>
AuthorDate: Wed Nov 8 12:20:19 2023 -0800
IMPALA-12550: Fix flaky test
test_statestored_auto_failover_with_disabling_network
Test test_statestored_auto_failover_with_disabling_network failed
occasionally due to delay of HA Handshake or HA heartbeat RPCs between
two statestore instances. Sometimes the active statestore took a few
minutes to respond to the handshake requests from standby statestore.
This patch fixes the issue by not holding mutex ha_lock_ when sending
HA handshake and HA heartbeat. Redundant HA heartbeats are handled
on receiver side. Redundant HA handshakes are harmless.
Testing:
- Repeatedly ran test_statestored_auto_failover_with_disabling_network
on Jenkins for hundreds of times without failure.
- Repeatedly ran test_statestored_auto_failover_with_disabling_network
on local machine for thousand times without failure.
- Repeatedly ran all tests in test_statestored_ha.py for over 12 hours
on Jenkins without failure.
- Passed core tests.
Change-Id: I515bbaaddfb4bf9bd2a39414cd6e3e4590dfbfb1
Reviewed-on: http://gerrit.cloudera.org:8080/20689
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/statestore/statestore-subscriber.cc | 9 ++--
be/src/statestore/statestore.cc | 81 ++++++++++++++++++++++++-----
be/src/statestore/statestore.h | 10 ++++
tests/custom_cluster/test_statestored_ha.py | 9 ++--
4 files changed, 87 insertions(+), 22 deletions(-)
diff --git a/be/src/statestore/statestore-subscriber.cc
b/be/src/statestore/statestore-subscriber.cc
index e62e89219..24b9e7e59 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -396,7 +396,8 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId&
registration_id,
*active_statestore_conn_state = statestore_->GetStatestoreConnState();
}
} else {
- VLOG(3) << "Ignore heartbeat message from unknown statestored: " <<
statestore_id;
+ VLOG(3) << "Ignore heartbeat message from unknown statestored: "
+ << PrintId(statestore_id);
}
}
@@ -416,7 +417,7 @@ void StatestoreSubscriber::UpdateCatalogd(
// the future.
*update_skipped = true;
LOG(INFO) << "Skipped updating catalogd message from unknown or inactive "
- << "statestored: " << statestore_id;
+ << "statestored: " << PrintId(statestore_id);
}
}
@@ -494,7 +495,7 @@ void StatestoreSubscriber::UpdateStatestoredRole(bool
is_active,
// update in the future.
*update_skipped = true;
LOG(INFO) << "Skipped updating statestored message from unknown
statestored: "
- << statestore_id;
+ << PrintId(statestore_id);
}
}
@@ -523,7 +524,7 @@ Status StatestoreSubscriber::UpdateState(const
TopicDeltaMap& incoming_topic_del
// future.
*skipped = true;
VLOG(3) << "Skipped topic update message from unknown or inactive
statestored: "
- << statestore_id;
+ << PrintId(statestore_id);
return Status::OK();
}
}
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 349ebc091..df2573b2a 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -208,6 +208,13 @@ const string STATESTORE_CONNECTED_PEER =
"statestore.connected-with-peer-statest
// an entry with the initial version.
const Statestore::TopicEntry::Version
Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
+// If statestore instance in active state receives more than 10 heartbeats
from its peer,
+// enter recovery mode to re-negotiate role with its peer.
+// Heartbeat period is set by
FALGS_statestore_ha_heartbeat_monitoring_frequency_ms, its
+// default value is 1000 ms. That means statestore instance in active state
will enter
+// recovery mode in 10 seconds if it repeatedly receives heartbeats from its
peer.
+#define MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE 10
+
// Updates or heartbeats that miss their deadline by this much are logged.
const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
@@ -674,6 +681,7 @@ Statestore::Statestore(MetricGroup* metrics)
FLAGS_statestore_max_missed_heartbeats,
FLAGS_statestore_max_missed_heartbeats / 2)) {
UUIDToTUniqueId(boost::uuids::random_generator()(), &statestore_id_);
+ LOG(INFO) << "Statestore ID: " << PrintId(statestore_id_);
DCHECK(metrics != NULL);
metrics_ = metrics;
num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
@@ -712,6 +720,7 @@ Statestore::Statestore(MetricGroup* metrics)
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
+ num_received_heartbeat_in_active_ = 0;
} else {
is_active_ = false;
active_status_metric_->SetValue(is_active_);
@@ -1823,6 +1832,7 @@ Status Statestore::InitStatestoreHa(
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
+ num_received_heartbeat_in_active_ = 0;
LOG(INFO) << "Set Statestore as active since it does not receive handshake
"
<< "response in HA preemption waiting period";
found_peer_ = false;
@@ -1855,6 +1865,11 @@ int64_t Statestore::GetActiveVersion(bool* is_active) {
return active_version_;
}
+bool Statestore::IsInRecoveryMode() {
+ lock_guard<mutex> l(ha_lock_);
+ return in_recovery_mode_;
+}
+
Status Statestore::SendHaHandshake(TStatestoreHaHandshakeResponse* response) {
if (disable_network_.Load()) {
return Status("Don't send HA handshake since network is disabled.");
@@ -1906,6 +1921,7 @@ Status Statestore::ReceiveHaHandshakeRequest(const
TUniqueId& peer_statestore_id
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
+ num_received_heartbeat_in_active_ = 0;
ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
LOG(INFO) << "Set the statestored as active since it's started with
force active "
<< "flag";
@@ -1924,6 +1940,8 @@ Status Statestore::ReceiveHaHandshakeRequest(const
TUniqueId& peer_statestore_id
}
LOG(INFO) << "Set the statestored as " << (is_active_ ? "active" :
"standby");
}
+ } else {
+ LOG(INFO) << "Active state of statestored is not changed";
}
*statestore_active = is_active_;
if (!found_peer_) {
@@ -1942,11 +1960,16 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
// process HA heartbeat from active statestore
ha_active_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
} else {
- // Receive heartbeat from its peer statestored. That means both
statestored designate
- // themselves as active. Enter recovery mode to restart negotiation.
+ num_received_heartbeat_in_active_++;
+ if (num_received_heartbeat_in_active_ <=
MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE) {
+ return;
+ }
+ // Repeatedly receive heartbeat from its peer statestored. That means both
statestored
+ // designate themselves as active. Enter recovery mode to restart
negotiation.
LOG(WARNING)
<< "Both statestoreds designate themselves as active, restart
negotiation.";
in_recovery_mode_ = true;
+ recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
is_active_ = false;
active_status_metric_->SetValue(is_active_);
@@ -1954,19 +1977,30 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
}
}
+// TODO: break this function to 3 functions for each branch: recovery-mode,
active state,
+// and standby state.
[[noreturn]] void Statestore::MonitorStatestoredHaHeartbeat() {
+ bool sleep_between_processing = true;
while (1) {
- SleepForMs(FLAGS_statestore_ha_heartbeat_monitoring_frequency_ms);
- lock_guard<mutex> l(ha_lock_);
- if (in_recovery_mode_) {
+ if (sleep_between_processing) {
+ SleepForMs(FLAGS_statestore_ha_heartbeat_monitoring_frequency_ms);
+ } else {
+ sleep_between_processing = true;
+ }
+ if (IsInRecoveryMode()) {
// Keep sending HA handshake request to its peer periodically until
receiving
- // response.
+ // response. Don't hold the ha_lock_ when sending HA handshake.
TStatestoreHaHandshakeResponse response;
Status status = SendHaHandshake(&response);
if (!status.ok()) continue;
status = Status(response.status);
DCHECK(status.ok());
+ lock_guard<mutex> l(ha_lock_);
+ if (!in_recovery_mode_) {
+ sleep_between_processing = false;
+ continue;
+ }
// Exit "recovery" mode.
in_recovery_mode_ = false;
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
@@ -1975,20 +2009,32 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
active_status_metric_->SetValue(is_active_);
found_peer_ = true;
connected_peer_metric_->SetValue(found_peer_);
- LOG(INFO) << "Receive Statestore HA handshake response, exit HA recovery
mode. "
- << "Set the statestored as " << (is_active_ ? "active" :
"standby");
+ int64_t elapsed_ms = MonotonicMillis() - recovery_start_time_;
+ LOG(INFO) << "Receive Statestore HA handshake response, exit HA recovery
mode in "
+ << PrettyPrinter::Print(elapsed_ms, TUnit::TIME_MS)
+ << ". Set the statestored as " << (is_active_ ? "active" :
"standby");
if (is_active_) {
active_version_ = UnixMicros();
// Send notification to all subscribers.
update_statestored_cv_.NotifyAll();
}
- } else if (is_active_) {
+ } else if (IsActive()) {
// Statestored in active state
// Send HA heartbeat to standby statestored.
- if (found_peer_) {
+ bool send_heartbeat = false;
+ {
+ lock_guard<mutex> l(ha_lock_);
+ if (is_active_ && found_peer_) send_heartbeat = true;
+ }
+ if (send_heartbeat) {
Status status = SendHaHeartbeat();
if (status.ok()) continue;
}
+ lock_guard<mutex> l(ha_lock_);
+ if (!is_active_) {
+ sleep_between_processing = false;
+ continue;
+ }
// Check if standby statestored is reachable.
FailureDetector::PeerState state =
ha_standby_ss_failure_detector_->GetPeerState(STATESTORE_ID);
@@ -2001,12 +2047,13 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
LOG(INFO) << "Statestored lost connection with peer statestored";
}
- lock_guard<mutex> l(subscribers_lock_);
+ lock_guard<mutex> l2(subscribers_lock_);
if (subscribers_.size() == 0) {
// To avoid race with new active statestored, original active
statestored enter
// "recovery" mode if it does not receive heartbeat responses from
standby
// statestored and all subscribers.
in_recovery_mode_ = true;
+ recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
is_active_ = false;
active_status_metric_->SetValue(is_active_);
@@ -2017,6 +2064,11 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
} else {
// Statestored in standby state
// Monitor connection state with its peer statestored.
+ lock_guard<mutex> l(ha_lock_);
+ if (is_active_) {
+ sleep_between_processing = false;
+ continue;
+ }
FailureDetector::PeerState state =
ha_active_ss_failure_detector_->GetPeerState(STATESTORE_ID);
// Check if the majority of subscribers lost connection with active
statestored.
@@ -2050,12 +2102,14 @@ void Statestore::HaHeartbeatRequest(const TUniqueId&
dst_statestore_id,
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
+ num_received_heartbeat_in_active_ = 0;
// Send notification to all subscribers.
update_statestored_cv_.NotifyAll();
} else if (total_subscribers == 0) {
// If there is no subscriber, it means this statestored lost
connection with
// other nodes in the cluster, enter "recovery" mode.
in_recovery_mode_ = true;
+ recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
LOG(WARNING) << "Enter HA recovery mode.";
} else {
@@ -2080,7 +2134,10 @@ Status Statestore::SendHaHeartbeat() {
TStatestoreHaHeartbeatRequest request;
TStatestoreHaHeartbeatResponse response;
- request.__set_dst_statestore_id(peer_statestore_id_);
+ {
+ lock_guard<mutex> l(ha_lock_);
+ request.__set_dst_statestore_id(peer_statestore_id_);
+ }
request.__set_src_statestore_id(statestore_id_);
status =
client.DoRpc(&StatestoreHaServiceClientWrapper::StatestoreHaHeartbeat,
request, &response);
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index c55cc227b..11b0fb6fe 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -794,6 +794,13 @@ class Statestore : public CacheLineAligned {
/// True if the statestore instance is in recovery mode.
bool in_recovery_mode_ = false;
+ /// Starting time to enter recovery mode.
+ int64_t recovery_start_time_;
+
+ /// Number of HA heartbeat received in active state.
+ /// Reset this variable whenever `is_active_` is set to true.
+ int num_received_heartbeat_in_active_ = 0;
+
/// Disable network if this variable is set as true by statestore service
API.
/// This is only used for unit-test.
AtomicBool disable_network_{false};
@@ -999,6 +1006,9 @@ class Statestore : public CacheLineAligned {
/// Raw callback to indicate whether the service is ready.
void HealthzHandler(const Webserver::WebRequest& req, std::stringstream*
data,
HttpStatusCode* response);
+
+ // Return true if this statestore instance is in recovery mode.
+ bool IsInRecoveryMode();
};
} // namespace impala
diff --git a/tests/custom_cluster/test_statestored_ha.py
b/tests/custom_cluster/test_statestored_ha.py
index 3eec88a96..1e9ec17f4 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -629,12 +629,9 @@ class TestStatestoredHA(CustomClusterTestSuite):
# Re-enable network for standby statestored. Verify that the statestored
exits
# HA recovery mode.
self.__disable_statestored_network(disable_network=False)
- # IMPALA-12550: sometimes the active statestore takes a few minutes to
response
- # the HA handshake from standby statestore. Temporarily comment out
following
- # two lines util the issue is fixed.
- # statestore_service_0.wait_for_metric_value(
- # "statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
- # assert(not
statestore_service_0.get_metric_value("statestore.active-status"))
+ statestore_service_0.wait_for_metric_value(
+ "statestore.in-ha-recovery-mode", expected_value=False, timeout=120)
+ assert(not
statestore_service_0.get_metric_value("statestore.active-status"))
class TestStatestoredHAStartupDelay(CustomClusterTestSuite):