Repository: impala Updated Branches: refs/heads/master e1c9cbd07 -> 93a0ce857
IMPALA-7857: log more information about statestore failure detection This adds a couple of log messages for state transitions in the statestore's failure detector. Testing: Ran test_statestore.py and checked for presence of new log messages. Added a new tests to test_statestore that exercises handling of intermittent heartbeat failures (required to produce one of the new log messages). Change-Id: Ie6ff85bee117000e4434dcffd3d1680a79905f14 Reviewed-on: http://gerrit.cloudera.org:8080/11937 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/93a0ce85 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/93a0ce85 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/93a0ce85 Branch: refs/heads/master Commit: 93a0ce857f181f2fe4248252428fc2adfdf1bdb7 Parents: e1c9cbd Author: Tim Armstrong <[email protected]> Authored: Thu Nov 15 14:42:26 2018 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Nov 19 22:55:40 2018 +0000 ---------------------------------------------------------------------- be/src/statestore/failure-detector.cc | 28 +++++++++++++++++++--------- be/src/statestore/failure-detector.h | 3 +++ tests/statestore/test_statestore.py | 22 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/failure-detector.cc b/be/src/statestore/failure-detector.cc index 9aeaff8..8043bed 100644 --- a/be/src/statestore/failure-detector.cc +++ b/be/src/statestore/failure-detector.cc @@ -76,30 +76,40 @@ FailureDetector::PeerState MissedHeartbeatFailureDetector::UpdateHeartbeat( const string& peer, bool seen) { { lock_guard<mutex> l(lock_); + int32_t* missed_heartbeat_count = &missed_heartbeat_counts_[peer]; if (seen) { - missed_heartbeat_counts_[peer] = 0; + if (*missed_heartbeat_count != 0) { + LOG(INFO) << "Heartbeat for '" << peer << "' succeeded after " + << *missed_heartbeat_count << " missed heartbeats. " + << "Resetting missed heartbeat count."; + *missed_heartbeat_count = 0; + } return OK; } else { - ++missed_heartbeat_counts_[peer]; + ++(*missed_heartbeat_count); + LOG(INFO) << *missed_heartbeat_count << " consecutive heartbeats failed for " + << "'" << peer << "'. State is " + << PeerStateToString(ComputePeerState(*missed_heartbeat_count)); } } - return GetPeerState(peer); } FailureDetector::PeerState MissedHeartbeatFailureDetector::GetPeerState( const string& peer) { lock_guard<mutex> l(lock_); - map<string, int32_t>::iterator heartbeat_record = missed_heartbeat_counts_.find(peer); + auto it = missed_heartbeat_counts_.find(peer); + if (it == missed_heartbeat_counts_.end()) return UNKNOWN; + return ComputePeerState(it->second); +} - if (heartbeat_record == missed_heartbeat_counts_.end()) { - return UNKNOWN; - } else if (heartbeat_record->second > max_missed_heartbeats_) { +FailureDetector::PeerState MissedHeartbeatFailureDetector::ComputePeerState( + int32_t missed_heatbeat_count) { + if (missed_heatbeat_count > max_missed_heartbeats_) { return FAILED; - } else if (heartbeat_record->second > suspect_missed_heartbeats_) { + } else if (missed_heatbeat_count > suspect_missed_heartbeats_) { return SUSPECTED; } - return OK; } http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/failure-detector.h b/be/src/statestore/failure-detector.h index c3f504e..5962d06 100644 --- a/be/src/statestore/failure-detector.h +++ b/be/src/statestore/failure-detector.h @@ -126,6 +126,9 @@ class MissedHeartbeatFailureDetector : public FailureDetector { virtual void EvictPeer(const std::string& peer); private: + /// Computes the PeerState from the missed heartbeat count. + PeerState ComputePeerState(int32_t missed_heatbeat_count); + /// Protects all members boost::mutex lock_; http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index a951414..f9e61cf 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -532,6 +532,28 @@ class TestStatestore(): .wait_for_failure(timeout=60) ) + def test_intermittent_hung_heartbeats(self): + """Heartbeats that occasionally time out should not cause a failure to be detected.""" + heartbeat_count = [0] # Use array to allow mutating from inside callback. + + def heartbeat_cb(sub, args): + heartbeat_count[0] += 1 + # Delay every second heartbeat. + if (heartbeat_count[0] % 2 == 1): + time.sleep(4) + return Subscriber.THeartbeatResponse() + + with StatestoreSubscriber(heartbeat_cb=heartbeat_cb) as sub: + topic_name = "test_intermittent_hung_heartbeats" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + ( + sub.start() + .register(topics=[reg]) + .wait_for_update(topic_name, 30) + .kill() + .wait_for_failure() + ) + def test_slow_subscriber(self): """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random interval between 1 and 9 seconds, this lets the heartbeats fail without removing the
