IMPALA-6816: minimise calls to GetMinSubscriberTopicVersion() min_subscriber_topic_version is expensive to compute (requires iterating over all subscribers to compute) but is only used by one subscriber/topic pair: Impalads receiving catalog topic updates.
This patch implements a simple fix - only compute it if a subscriber asks for it. A more complex alternative would be to maintain a priority queue of subscriber versions, but that didn't seem worth the the complexity and risk of bugs. Testing: Add a statestore test to validate the versions. It looks like we had a pre-existing test gap for validating min_subscriber_topic_version so the test is mainly focused on adding that coverage. Ran core tests with DEBUG and ASAN. Change-Id: I8ee7cb2355ba1049b9081e0df344ac41aa4ebeb1 Reviewed-on: http://gerrit.cloudera.org:8080/10705 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/888dc82f Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/888dc82f Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/888dc82f Branch: refs/heads/master Commit: 888dc82ff70afa227d594087806354ad60e0d915 Parents: aebec1c Author: Tim Armstrong <[email protected]> Authored: Mon Jun 11 17:22:35 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jun 26 23:55:38 2018 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog-server.cc | 2 +- be/src/scheduling/admission-controller.cc | 9 +-- be/src/scheduling/scheduler.cc | 2 +- be/src/service/impala-server.cc | 11 +-- be/src/statestore/statestore-subscriber.cc | 7 +- be/src/statestore/statestore-subscriber.h | 6 +- be/src/statestore/statestore.cc | 50 +++++++------ be/src/statestore/statestore.h | 8 ++- common/thrift/StatestoreService.thrift | 6 ++ tests/statestore/test_statestore.py | 93 +++++++++++++++++++++++++ 10 files changed, 160 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/catalog/catalog-server.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index e74db75..a17478f 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -186,7 +186,7 @@ Status CatalogServer::Start() { StatestoreSubscriber::UpdateCallback cb = bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2); - status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb); + status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, false, cb); if (!status.ok()) { status.AddDetail("CatalogService failed to start"); return status; http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index a1c6c77..3960528 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -18,7 +18,6 @@ #include "scheduling/admission-controller.h" #include <boost/algorithm/string.hpp> -#include <boost/bind.hpp> #include <boost/mem_fn.hpp> #include <gutil/strings/substitute.h> @@ -243,9 +242,11 @@ AdmissionController::~AdmissionController() { Status AdmissionController::Init() { RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread", &AdmissionController::DequeueLoop, this, &dequeue_thread_)); - StatestoreSubscriber::UpdateCallback cb = - bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2); - Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb); + auto cb = [this]( + const StatestoreSubscriber::TopicDeltaMap& state, + vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); }; + Status status = + subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, false, cb); if (!status.ok()) { status.AddDetail("AdmissionController failed to register request queue topic"); } http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 5772c9a..7c0af6f 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -88,7 +88,7 @@ Status Scheduler::Init(const TNetworkAddress& backend_address, StatestoreSubscriber::UpdateCallback cb = bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2); Status status = statestore_subscriber_->AddTopic( - Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb); + Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb); if (!status.ok()) { status.AddDetail("Scheduler failed to register membership topic"); return status; http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index c5d0308..e9ddbe4 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -356,12 +356,12 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) // Register the membership callback if running in a real cluster. if (!TestInfo::is_test()) { - auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, - vector<TTopicDelta>* topic_updates) { + auto cb = [this](const StatestoreSubscriber::TopicDeltaMap& state, + vector<TTopicDelta>* topic_updates) { this->MembershipCallback(state, topic_updates); }; - ABORT_IF_ERROR( - exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb)); + ABORT_IF_ERROR(exec_env->subscriber()->AddTopic( + Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb)); if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) { auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, @@ -369,7 +369,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) this->CatalogUpdateCallback(state, topic_updates); }; ABORT_IF_ERROR(exec_env->subscriber()->AddTopic( - CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb)); + CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb)); } } @@ -1517,6 +1517,7 @@ void ImpalaServer::CatalogUpdateCallback( // Always update the minimum subscriber version for the catalog topic. { unique_lock<mutex> unique_lock(catalog_version_lock_); + DCHECK(delta.__isset.min_subscriber_topic_version); min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version; } catalog_version_update_cv_.NotifyAll(); http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore-subscriber.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 7fb113e..443a0e5 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -138,7 +138,8 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id, } Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id, - bool is_transient, const UpdateCallback& callback) { + bool is_transient, bool populate_min_subscriber_topic_version, + const UpdateCallback& callback) { lock_guard<shared_mutex> exclusive_lock(lock_); if (is_registered_) return Status("Subscriber already started, can't add new topic"); TopicRegistration& registration = topic_registrations_[topic_id]; @@ -151,6 +152,8 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id, registration.update_interval_timer.Start(); } registration.is_transient = is_transient; + registration.populate_min_subscriber_topic_version = + populate_min_subscriber_topic_version; return Status::OK(); } @@ -164,6 +167,8 @@ Status StatestoreSubscriber::Register() { TTopicRegistration thrift_topic; thrift_topic.topic_name = registration.first; thrift_topic.is_transient = registration.second.is_transient; + thrift_topic.populate_min_subscriber_topic_version = + registration.second.populate_min_subscriber_topic_version; request.topic_registrations.push_back(thrift_topic); } http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore-subscriber.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h index 554583d..05d4489 100644 --- a/be/src/statestore/statestore-subscriber.h +++ b/be/src/statestore/statestore-subscriber.h @@ -112,7 +112,7 @@ class StatestoreSubscriber { /// Must be called before Start(), in which case it will return /// Status::OK. Otherwise an error will be returned. Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient, - const UpdateCallback& callback); + bool populate_min_subscriber_topic_version, const UpdateCallback& callback); /// Registers this subscriber with the statestore, and starts the /// heartbeat service, as well as a thread to check for failure and @@ -216,6 +216,10 @@ class StatestoreSubscriber { /// it makes will be deleted upon failure or disconnection. bool is_transient = false; + /// Whether this subscriber needs the min_subscriber_topic_version field to be filled + /// in on updates. + bool populate_min_subscriber_topic_version = false; + /// The last version of the topic this subscriber processed. /// -1 if no updates have been processed yet. int64_t current_topic_version = -1; http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 90ea167..a58aec1 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -300,12 +300,14 @@ void Statestore::Topic::ToJson(Document* document, Value* topic_json) { Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id, const RegistrationId& registration_id, const TNetworkAddress& network_address, const vector<TTopicRegistration>& subscribed_topics) - : subscriber_id_(subscriber_id), - registration_id_(registration_id), - network_address_(network_address) { - for (const TTopicRegistration& topic: subscribed_topics) { - GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct, - forward_as_tuple(topic.topic_name), forward_as_tuple(topic.is_transient)); + : subscriber_id_(subscriber_id), + registration_id_(registration_id), + network_address_(network_address) { + for (const TTopicRegistration& topic : subscribed_topics) { + GetTopicsMapForId(topic.topic_name) + ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name), + forward_as_tuple( + topic.is_transient, topic.populate_min_subscriber_topic_version)); } } @@ -697,18 +699,22 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kin return Status::OK(); } -void Statestore::GatherTopicUpdates(const Subscriber& subscriber, - UpdateKind update_kind, TUpdateStateRequest* update_state_request) { +void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind, + TUpdateStateRequest* update_state_request) { + DCHECK(update_kind == UpdateKind::TOPIC_UPDATE + || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE) + << static_cast<int>(update_kind); + // Indices into update_state_request->topic_deltas where we need to populate + // 'min_subscriber_topic_version'. GetMinSubscriberTopicVersion() is somewhat + // expensive so we want to avoid calling it unless necessary. + vector<TTopicDelta*> deltas_needing_min_version; { - DCHECK(update_kind == UpdateKind::TOPIC_UPDATE - || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE) - << static_cast<int>(update_kind); const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE; - const Subscriber::Topics& subscribed_topics = is_priority - ? subscriber.priority_subscribed_topics() - : subscriber.non_priority_subscribed_topics(); + const Subscriber::Topics& subscribed_topics = is_priority ? + subscriber.priority_subscribed_topics() : + subscriber.non_priority_subscribed_topics(); shared_lock<shared_mutex> l(topics_map_lock_); - for (const auto& subscribed_topic: subscribed_topics) { + for (const auto& subscribed_topic : subscribed_topics) { auto topic_it = topics_.find(subscribed_topic.first); DCHECK(topic_it != topics_.end()); TopicEntry::Version last_processed_version = @@ -718,16 +724,20 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber, update_state_request->topic_deltas[subscribed_topic.first]; topic_delta.topic_name = subscribed_topic.first; topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta); + if (subscribed_topic.second.populate_min_subscriber_topic_version) { + deltas_needing_min_version.push_back(&topic_delta); + } } } // Fill in the min subscriber topic version. This must be done after releasing // topics_map_lock_. - lock_guard<mutex> l(subscribers_lock_); - typedef map<TopicId, TTopicDelta> TopicDeltaMap; - for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) { - topic_delta.second.__set_min_subscriber_topic_version( - GetMinSubscriberTopicVersion(topic_delta.first)); + if (!deltas_needing_min_version.empty()) { + lock_guard<mutex> l(subscribers_lock_); + for (TTopicDelta* delta : deltas_needing_min_version) { + delta->__set_min_subscriber_topic_version( + GetMinSubscriberTopicVersion(delta->topic_name)); + } } } http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index edaef49..71e1ade 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -331,11 +331,17 @@ class Statestore : public CacheLineAligned { /// Information about a subscriber's subscription to a specific topic. struct TopicSubscription { - TopicSubscription(bool is_transient) : is_transient(is_transient) {} + TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version) + : is_transient(is_transient), + populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {} /// Whether entries written by this subscriber should be considered transient. const bool is_transient; + /// Whether min_subscriber_topic_version needs to be filled in for this + /// subscription. + const bool populate_min_subscriber_topic_version; + /// The last topic entry version successfully processed by this subscriber. Only /// written by a single thread at a time but can be read concurrently. AtomicInt64 last_version{TOPIC_INITIAL_VERSION}; http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/common/thrift/StatestoreService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift index 4f2dada..783bea7 100644 --- a/common/thrift/StatestoreService.thrift +++ b/common/thrift/StatestoreService.thrift @@ -139,6 +139,12 @@ struct TTopicRegistration { // True if updates to this topic from this subscriber should be removed upon the // subscriber's failure or disconnection 2: required bool is_transient; + + // If true, min_subscriber_topic_version is computed and set in topic updates sent + // to this subscriber to this subscriber. Should only be set to true if this is + // actually required - computing the version is relatively expensive compared to + // other aspects of preparing topic updates - see IMPALA-6816. + 3: required bool populate_min_subscriber_topic_version = false; } struct TRegisterSubscriberRequest { http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index 2a7c2f9..698a2a6 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -590,3 +590,96 @@ class TestStatestore(): sub.register(topics=[reg]) LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id)) sub.wait_for_update(topic_name, target_updates) + + def test_min_subscriber_topic_version(self): + self._do_test_min_subscriber_topic_version(False) + + def test_min_subscriber_topic_version_with_straggler(self): + self._do_test_min_subscriber_topic_version(True) + + def _do_test_min_subscriber_topic_version(self, simulate_straggler): + """Implementation of test that the 'min_subscriber_topic_version' flag is correctly + set when requested. This tests runs two subscribers concurrently and tracks the + minimum version each has processed. If 'simulate_straggler' is true, one subscriber + rejects updates so that its version is not advanced.""" + topic_name = "test_min_subscriber_topic_version_%s" % uuid.uuid1() + + # This lock is held while processing the update to protect last_to_versions. + update_lock = threading.Lock() + last_to_versions = {} + TOTAL_SUBSCRIBERS = 2 + def callback(sub, args, is_producer, sub_name): + """Callback for subscriber to verify min_subscriber_topic_version behaviour. + If 'is_producer' is true, this acts as the producer, otherwise it acts as the + consumer. 'sub_name' is a name used to index into last_to_versions.""" + if topic_name not in args.topic_deltas: + # The update doesn't contain our topic. + pass + with update_lock: + LOG.info("{0} got update {1}".format(sub_name, + repr(args.topic_deltas[topic_name]))) + LOG.info("Versions: {0}".format(last_to_versions)) + to_version = args.topic_deltas[topic_name].to_version + from_version = args.topic_deltas[topic_name].from_version + min_subscriber_topic_version = \ + args.topic_deltas[topic_name].min_subscriber_topic_version + + if is_producer: + assert min_subscriber_topic_version is not None + assert (to_version == 0 and min_subscriber_topic_version == 0) or\ + min_subscriber_topic_version < to_version,\ + "'to_version' hasn't been created yet by this subscriber." + # Only validate version once all subscribers have processed an update. + if len(last_to_versions) == TOTAL_SUBSCRIBERS: + min_to_version = min(last_to_versions.values()) + assert min_subscriber_topic_version <= min_to_version,\ + "The minimum subscriber topic version seen by the producer cannot get " +\ + "ahead of the minimum version seem by the consumer, by definition." + assert min_subscriber_topic_version >= min_to_version - 2,\ + "The min topic version can be two behind the last version seen by " + \ + "this subscriber because the updates for both subscribers are " + \ + "prepared in parallel and because it's possible that the producer " + \ + "processes two updates in-between consumer updates. This is not " + \ + "absolute but depends on updates not being delayed a large amount." + else: + # Consumer did not request topic version. + assert min_subscriber_topic_version is None + + # Check the 'to_version' and update 'last_to_versions'. + last_to_version = last_to_versions.get(sub_name, 0) + if to_version > 0: + # Non-empty update. + assert from_version == last_to_version + # Stragglers should accept the first update then skip later ones. + skip_update = simulate_straggler and not is_producer and last_to_version > 0 + if not skip_update: last_to_versions[sub_name] = to_version + + if is_producer: + delta = self.make_topic_update(topic_name) + return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta], + skipped=False) + elif skip_update: + return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True) + else: + return DEFAULT_UPDATE_STATE_RESPONSE + + # Two concurrent subscribers, which pushes out updates and checks the minimum + # version, the other which just consumes the updates. + def producer_callback(sub, args): return callback(sub, args, True, "producer") + def consumer_callback(sub, args): return callback(sub, args, False, "consumer") + consumer_sub = StatestoreSubscriber(update_cb=consumer_callback) + consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True) + producer_sub = StatestoreSubscriber(update_cb=producer_callback) + producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True, + populate_min_subscriber_topic_version=True) + NUM_UPDATES = 6 + ( + consumer_sub.start() + .register(topics=[consumer_reg]) + ) + ( + producer_sub.start() + .register(topics=[producer_reg]) + .wait_for_update(topic_name, NUM_UPDATES) + ) + consumer_sub.wait_for_update(topic_name, NUM_UPDATES)
