IMPALA-6644: Add last heartbeat timestamp into Statestore metric After this patch, the statestore keeps track of the time since the last heartbeat for each subscriber. It is exposed as a subscriber metric on the statestore debug page. It also adds a monitoring thread that periodically checks the last heartbeat timestamp for all subscribers and logs the IDs of those that have not been updated since the last periodic check.
Testing: Added an end to end test to verify the 'sec_since_heartbeat' metric of a slow subscriber. Change-Id: I754adccc4569e8219d5d01500cccdfc8782953f7 Reviewed-on: http://gerrit.cloudera.org:8080/11052 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8692bfbe Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8692bfbe Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8692bfbe Branch: refs/heads/master Commit: 8692bfbef657fe95da68e9dcaca9b49de331ccc3 Parents: 8848588 Author: poojanilangekar <[email protected]> Authored: Tue Jul 24 18:01:21 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Aug 29 22:06:00 2018 +0000 ---------------------------------------------------------------------- be/src/statestore/statestore.cc | 55 ++++++++++++++++++++++++++++++-- be/src/statestore/statestore.h | 30 ++++++++++++++++- tests/statestore/test_statestore.py | 20 ++++++++++++ www/statestore_subscribers.tmpl | 2 ++ 4 files changed, 104 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 8749825..4e63dad 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -75,6 +75,8 @@ DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads " send heartbeats in parallel to all registered subscribers."); DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with" " which the statestore sends heartbeat heartbeats to subscribers."); +DEFINE_double_hidden(heartbeat_monitoring_frequency_ms, 60000, "(Advanced) Frequency (in " + "ms) with which the statestore monitors heartbeats from a subscriber."); DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running"); @@ -315,6 +317,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id, : subscriber_id_(subscriber_id), registration_id_(registration_id), network_address_(network_address) { + RefreshLastHeartbeatTimestamp(); for (const TTopicRegistration& topic : subscribed_topics) { GetTopicsMapForId(topic.topic_name) ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name), @@ -388,6 +391,11 @@ void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i topic_it->second.last_version.Store(version); } +void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() { + DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load()); + last_heartbeat_ts_.Store(MonotonicMillis()); +} + Statestore::Statestore(MetricGroup* metrics) : subscriber_topic_update_threadpool_("statestore-update", "subscriber-update-worker", @@ -419,7 +427,6 @@ Statestore::Statestore(MetricGroup* metrics) failure_detector_(new MissedHeartbeatFailureDetector( FLAGS_statestore_max_missed_heartbeats, FLAGS_statestore_max_missed_heartbeats / 2)) { - DCHECK(metrics != NULL); metrics_ = metrics; num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0); @@ -440,6 +447,10 @@ Statestore::Statestore(MetricGroup* metrics) heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat"); } +Statestore::~Statestore() { + CHECK(initialized_) << "Cannot shutdown Statestore once initialized."; +} + Status Statestore::Init(int32_t state_store_port) { boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface())); boost::shared_ptr<TProcessorEventHandler> event_handler( @@ -464,6 +475,9 @@ Status Statestore::Init(int32_t state_store_port) { RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init()); RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init()); RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init()); + RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", "heartbeat-monitoring-thread", + &Statestore::MonitorSubscriberHeartbeat, this, &heartbeat_monitoring_thread_)); + initialized_ = true; return Status::OK(); } @@ -536,6 +550,12 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args, document->GetAllocator()); sub_json.AddMember("registration_id", registration_id, document->GetAllocator()); + Value secs_since_heartbeat( + StringPrintf("%.3f", subscriber.second->SecondsSinceHeartbeat()).c_str(), + document->GetAllocator()); + sub_json.AddMember( + "secs_since_heartbeat", secs_since_heartbeat, document->GetAllocator()); + subscribers.PushBack(sub_json, document->GetAllocator()); } document->AddMember("subscribers", subscribers, document->GetAllocator()); @@ -898,7 +918,9 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id, Status status; if (is_heartbeat) { status = SendHeartbeat(subscriber.get()); - if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) { + if (status.ok()) { + subscriber->RefreshLastHeartbeatTimestamp(); + } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) { // Add details to status to make it more useful, while preserving the stack status.AddDetail(Substitute( "Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.", @@ -968,6 +990,35 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id, } } +[[noreturn]] void Statestore::MonitorSubscriberHeartbeat() { + while (1) { + int num_subscribers; + vector<SubscriberId> inactive_subscribers; + SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms); + { + lock_guard<mutex> l(subscribers_lock_); + num_subscribers = subscribers_.size(); + for (const auto& subscriber : subscribers_) { + if (subscriber.second->SecondsSinceHeartbeat() + > FLAGS_heartbeat_monitoring_frequency_ms) { + inactive_subscribers.push_back(subscriber.second->id()); + } + } + } + if (inactive_subscribers.empty()) { + LOG(INFO) << "All " << num_subscribers + << " subscribers successfully heartbeat in the last " + << FLAGS_heartbeat_monitoring_frequency_ms << "ms."; + } else { + int num_active_subscribers = num_subscribers - inactive_subscribers.size(); + LOG(WARNING) << num_active_subscribers << "/" << num_subscribers + << " subscribers successfully heartbeat in the last " + << FLAGS_heartbeat_monitoring_frequency_ms << "ms." + << " Slow subscribers: " << boost::join(inactive_subscribers, ", "); + } + } +} + void Statestore::UnregisterSubscriber(Subscriber* subscriber) { SubscriberMap::const_iterator it = subscribers_.find(subscriber->id()); if (it == subscribers_.end() || http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 9326492..52f7d68 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -18,6 +18,7 @@ #ifndef STATESTORE_STATESTORE_H #define STATESTORE_STATESTORE_H +#include <atomic> #include <cstdint> #include <map> #include <memory> @@ -130,6 +131,9 @@ class Statestore : public CacheLineAligned { /// The only constructor; initialises member variables only. Statestore(MetricGroup* metrics); + /// Destructor, should not be called once the Statestore is initialized. + ~Statestore(); + /// Initialize and start the backing ThriftServer with port 'state_store_port'. /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if /// any of the above initialization fails. @@ -150,7 +154,7 @@ class Statestore : public CacheLineAligned { void RegisterWebpages(Webserver* webserver); - /// The main processing loop. Blocks until the exit flag is set. + /// The main processing loop. Runs infinitely. void MainLoop(); /// Returns the Thrift API interface that proxies requests onto the local Statestore. @@ -384,6 +388,12 @@ class Statestore : public CacheLineAligned { const SubscriberId& id() const { return subscriber_id_; } const RegistrationId& registration_id() const { return registration_id_; } + /// Returns the time elapsed (in seconds) since the last heartbeat. + double SecondsSinceHeartbeat() const { + return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load())) + / 1000.0; + } + /// Get the Topics map that would be used to store 'topic_id'. const Topics& GetTopicsMapForId(const TopicId& topic_id) const { return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_ @@ -427,6 +437,9 @@ class Statestore : public CacheLineAligned { void SetLastTopicVersionProcessed(const TopicId& topic_id, TopicEntry::Version version); + /// Refresh the subscriber's last heartbeat timestamp to the current monotonic time. + void RefreshLastHeartbeatTimestamp(); + private: /// Unique human-readable identifier for this subscriber, set by the subscriber itself /// on a Register call. @@ -449,6 +462,10 @@ class Statestore : public CacheLineAligned { Topics priority_subscribed_topics_; Topics non_priority_subscribed_topics_; + /// The timestamp of the last successful heartbeat in milliseconds. A timestamp much + /// older than the heartbeat frequency implies an unresponsive subscriber. + AtomicInt64 last_heartbeat_ts_{0}; + /// Lock held when adding or deleting transient entries. See class comment for lock /// acquisition order. boost::mutex transient_entry_lock_; @@ -534,6 +551,12 @@ class Statestore : public CacheLineAligned { ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_; + /// Thread that monitors the heartbeats of all subscribers. + std::unique_ptr<Thread> heartbeat_monitoring_thread_; + + /// Flag to indicate that the statestore has been initialized. + bool initialized_ = false; + /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per /// subscriber should be used, but the cache helps with the client lifecycle on failure. boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_; @@ -683,6 +706,11 @@ class Statestore : public CacheLineAligned { void SubscribersHandler(const Webserver::ArgumentMap& args, rapidjson::Document* document); + /// Monitors the heartbeats of all subscribers every + /// FLAGS_heartbeat_monitoring_frequency_ms milliseconds. If a subscriber's + /// last_heartbeat_ts_ has not been updated in that interval, it logs the subscriber's + /// id. + [[noreturn]] void MonitorSubscriberHeartbeat(); }; } http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index 8f26b63..23f8aa8 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -18,6 +18,7 @@ from collections import defaultdict import json import logging +from random import randint import socket import threading import traceback @@ -524,6 +525,25 @@ class TestStatestore(): .wait_for_failure(timeout=60) ) + def test_slow_subscriber(self): + """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random + interval between 1 and 9 seconds, this lets the heartbeats fail without removing the + subscriber from the set of active subscribers. It then checks the subscribers page + of the statestore to ensure that the 'time_since_heartbeat' field is updated with an + acceptable value. Since the statestore heartbeats at 1 second intervals, an acceptable + value would be between ((sleep_time-1.0), (sleep_time+1.0)).""" + sub = StatestoreSubscriber() + sub.start().register().wait_for_heartbeat(1) + sub.kill() + sleep_time = randint(1, 9) + time.sleep(sleep_time) + subscribers = get_statestore_subscribers()["subscribers"] + for s in subscribers: + if str(s["id"]) == sub.subscriber_id: + secs_since_heartbeat = float(s["secs_since_heartbeat"]) + assert (secs_since_heartbeat > float(sleep_time - 1.0)) + assert (secs_since_heartbeat < float(sleep_time + 1.0)) + def test_topic_persistence(self): """Test that persistent topic entries survive subscriber failure, but transent topic entries are erased when the associated subscriber fails""" http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/www/statestore_subscribers.tmpl ---------------------------------------------------------------------- diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl index f57b4f6..77b07dc 100644 --- a/www/statestore_subscribers.tmpl +++ b/www/statestore_subscribers.tmpl @@ -28,6 +28,7 @@ under the License. <th>Subscribed priority topics</th> <th>Transient entries</th> <th>Registration ID</th> + <th>Seconds since last heartbeat</th> </tr> {{#subscribers}} @@ -38,6 +39,7 @@ under the License. <td>{{num_priority_topics}}</td> <td>{{num_transient}}</td> <td>{{registration_id}}</td> + <td>{{secs_since_heartbeat}}</td> </tr> {{/subscribers}}
