IMPALA-6816: minimise calls to GetMinSubscriberTopicVersion()

min_subscriber_topic_version is expensive to compute (requires iterating
over all subscribers to compute) but is only used by one
subscriber/topic pair: Impalads receiving catalog topic updates.

This patch implements a simple fix - only compute it if a subscriber
asks for it. A more complex alternative would be to maintain
a priority queue of subscriber versions, but that didn't seem worth
the the complexity and risk of bugs.

Testing:
Add a statestore test to validate the versions. It looks like we had a
pre-existing test gap for validating min_subscriber_topic_version so
the test is mainly focused on adding that coverage.

Ran core tests with DEBUG and ASAN.

Change-Id: I8ee7cb2355ba1049b9081e0df344ac41aa4ebeb1
Reviewed-on: http://gerrit.cloudera.org:8080/10705
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/888dc82f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/888dc82f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/888dc82f

Branch: refs/heads/master
Commit: 888dc82ff70afa227d594087806354ad60e0d915
Parents: aebec1c
Author: Tim Armstrong <[email protected]>
Authored: Mon Jun 11 17:22:35 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Jun 26 23:55:38 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc           |  2 +-
 be/src/scheduling/admission-controller.cc  |  9 +--
 be/src/scheduling/scheduler.cc             |  2 +-
 be/src/service/impala-server.cc            | 11 +--
 be/src/statestore/statestore-subscriber.cc |  7 +-
 be/src/statestore/statestore-subscriber.h  |  6 +-
 be/src/statestore/statestore.cc            | 50 +++++++------
 be/src/statestore/statestore.h             |  8 ++-
 common/thrift/StatestoreService.thrift     |  6 ++
 tests/statestore/test_statestore.py        | 93 +++++++++++++++++++++++++
 10 files changed, 160 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index e74db75..a17478f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -186,7 +186,7 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, 
_2);
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, 
false, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index a1c6c77..3960528 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -18,7 +18,6 @@
 #include "scheduling/admission-controller.h"
 
 #include <boost/algorithm/string.hpp>
-#include <boost/bind.hpp>
 #include <boost/mem_fn.hpp>
 #include <gutil/strings/substitute.h>
 
@@ -243,9 +242,11 @@ AdmissionController::~AdmissionController() {
 Status AdmissionController::Init() {
   RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
       &AdmissionController::DequeueLoop, this, &dequeue_thread_));
-  StatestoreSubscriber::UpdateCallback cb =
-    bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = 
subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  auto cb = [this](
+      const StatestoreSubscriber::TopicDeltaMap& state,
+      vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, 
topic_updates); };
+  Status status =
+      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, 
false, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue 
topic");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5772c9a..7c0af6f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -88,7 +88,7 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
     Status status = statestore_subscriber_->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c5d0308..e9ddbe4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -356,12 +356,12 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
 
   // Register the membership callback if running in a real cluster.
   if (!TestInfo::is_test()) {
-    auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
-         vector<TTopicDelta>* topic_updates) {
+    auto cb = [this](const StatestoreSubscriber::TopicDeltaMap& state,
+        vector<TTopicDelta>* topic_updates) {
       this->MembershipCallback(state, topic_updates);
     };
-    ABORT_IF_ERROR(
-        exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, 
true, cb));
+    ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb));
 
     if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& 
state,
@@ -369,7 +369,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
         this->CatalogUpdateCallback(state, topic_updates);
       };
       ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-            CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb));
+          CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb));
     }
   }
 
@@ -1517,6 +1517,7 @@ void ImpalaServer::CatalogUpdateCallback(
   // Always update the minimum subscriber version for the catalog topic.
   {
     unique_lock<mutex> unique_lock(catalog_version_lock_);
+    DCHECK(delta.__isset.min_subscriber_topic_version);
     min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
   }
   catalog_version_update_cv_.NotifyAll();

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index 7fb113e..443a0e5 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -138,7 +138,8 @@ StatestoreSubscriber::StatestoreSubscriber(const 
std::string& subscriber_id,
 }
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
-    bool is_transient, const UpdateCallback& callback) {
+    bool is_transient, bool populate_min_subscriber_topic_version,
+    const UpdateCallback& callback) {
   lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new 
topic");
   TopicRegistration& registration = topic_registrations_[topic_id];
@@ -151,6 +152,8 @@ Status StatestoreSubscriber::AddTopic(const 
Statestore::TopicId& topic_id,
     registration.update_interval_timer.Start();
   }
   registration.is_transient = is_transient;
+  registration.populate_min_subscriber_topic_version =
+      populate_min_subscriber_topic_version;
   return Status::OK();
 }
 
@@ -164,6 +167,8 @@ Status StatestoreSubscriber::Register() {
     TTopicRegistration thrift_topic;
     thrift_topic.topic_name = registration.first;
     thrift_topic.is_transient = registration.second.is_transient;
+    thrift_topic.populate_min_subscriber_topic_version =
+        registration.second.populate_min_subscriber_topic_version;
     request.topic_registrations.push_back(thrift_topic);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h 
b/be/src/statestore/statestore-subscriber.h
index 554583d..05d4489 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,7 +112,7 @@ class StatestoreSubscriber {
   /// Must be called before Start(), in which case it will return
   /// Status::OK. Otherwise an error will be returned.
   Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
-      const UpdateCallback& callback);
+      bool populate_min_subscriber_topic_version, const UpdateCallback& 
callback);
 
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
@@ -216,6 +216,10 @@ class StatestoreSubscriber {
     /// it makes will be deleted upon failure or disconnection.
     bool is_transient = false;
 
+    /// Whether this subscriber needs the min_subscriber_topic_version field 
to be filled
+    /// in on updates.
+    bool populate_min_subscriber_topic_version = false;
+
     /// The last version of the topic this subscriber processed.
     /// -1 if no updates have been processed yet.
     int64_t current_topic_version = -1;

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 90ea167..a58aec1 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -300,12 +300,14 @@ void Statestore::Topic::ToJson(Document* document, Value* 
topic_json) {
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, const TNetworkAddress& 
network_address,
     const vector<TTopicRegistration>& subscribed_topics)
-    : subscriber_id_(subscriber_id),
-      registration_id_(registration_id),
-      network_address_(network_address) {
-  for (const TTopicRegistration& topic: subscribed_topics) {
-    GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
-        forward_as_tuple(topic.topic_name), 
forward_as_tuple(topic.is_transient));
+  : subscriber_id_(subscriber_id),
+    registration_id_(registration_id),
+    network_address_(network_address) {
+  for (const TTopicRegistration& topic : subscribed_topics) {
+    GetTopicsMapForId(topic.topic_name)
+        ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
+            forward_as_tuple(
+                topic.is_transient, 
topic.populate_min_subscriber_topic_version));
   }
 }
 
@@ -697,18 +699,22 @@ Status Statestore::SendTopicUpdate(Subscriber* 
subscriber, UpdateKind update_kin
   return Status::OK();
 }
 
-void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
-    UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
+void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind 
update_kind,
+    TUpdateStateRequest* update_state_request) {
+  DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+      || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE)
+      << static_cast<int>(update_kind);
+  // Indices into update_state_request->topic_deltas where we need to populate
+  // 'min_subscriber_topic_version'. GetMinSubscriberTopicVersion() is somewhat
+  // expensive so we want to avoid calling it unless necessary.
+  vector<TTopicDelta*> deltas_needing_min_version;
   {
-    DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
-        || update_kind ==  UpdateKind::PRIORITY_TOPIC_UPDATE)
-        << static_cast<int>(update_kind);
     const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
-    const Subscriber::Topics& subscribed_topics = is_priority
-        ? subscriber.priority_subscribed_topics()
-        : subscriber.non_priority_subscribed_topics();
+    const Subscriber::Topics& subscribed_topics = is_priority ?
+        subscriber.priority_subscribed_topics() :
+        subscriber.non_priority_subscribed_topics();
     shared_lock<shared_mutex> l(topics_map_lock_);
-    for (const auto& subscribed_topic: subscribed_topics) {
+    for (const auto& subscribed_topic : subscribed_topics) {
       auto topic_it = topics_.find(subscribed_topic.first);
       DCHECK(topic_it != topics_.end());
       TopicEntry::Version last_processed_version =
@@ -718,16 +724,20 @@ void Statestore::GatherTopicUpdates(const Subscriber& 
subscriber,
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
       topic_it->second.BuildDelta(subscriber.id(), last_processed_version, 
&topic_delta);
+      if (subscribed_topic.second.populate_min_subscriber_topic_version) {
+        deltas_needing_min_version.push_back(&topic_delta);
+      }
     }
   }
 
   // Fill in the min subscriber topic version. This must be done after 
releasing
   // topics_map_lock_.
-  lock_guard<mutex> l(subscribers_lock_);
-  typedef map<TopicId, TTopicDelta> TopicDeltaMap;
-  for (TopicDeltaMap::value_type& topic_delta: 
update_state_request->topic_deltas) {
-    topic_delta.second.__set_min_subscriber_topic_version(
-        GetMinSubscriberTopicVersion(topic_delta.first));
+  if (!deltas_needing_min_version.empty()) {
+    lock_guard<mutex> l(subscribers_lock_);
+    for (TTopicDelta* delta : deltas_needing_min_version) {
+      delta->__set_min_subscriber_topic_version(
+          GetMinSubscriberTopicVersion(delta->topic_name));
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index edaef49..71e1ade 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -331,11 +331,17 @@ class Statestore : public CacheLineAligned {
 
     /// Information about a subscriber's subscription to a specific topic.
     struct TopicSubscription {
-      TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+      TopicSubscription(bool is_transient, bool 
populate_min_subscriber_topic_version)
+        : is_transient(is_transient),
+          
populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {}
 
       /// Whether entries written by this subscriber should be considered 
transient.
       const bool is_transient;
 
+      /// Whether min_subscriber_topic_version needs to be filled in for this
+      /// subscription.
+      const bool populate_min_subscriber_topic_version;
+
       /// The last topic entry version successfully processed by this 
subscriber. Only
       /// written by a single thread at a time but can be read concurrently.
       AtomicInt64 last_version{TOPIC_INITIAL_VERSION};

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 4f2dada..783bea7 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -139,6 +139,12 @@ struct TTopicRegistration {
   // True if updates to this topic from this subscriber should be removed upon 
the
   // subscriber's failure or disconnection
   2: required bool is_transient;
+
+  // If true, min_subscriber_topic_version is computed and set in topic 
updates sent
+  // to this subscriber to this subscriber. Should only be set to true if this 
is
+  // actually required - computing the version is relatively expensive 
compared to
+  // other aspects of preparing topic updates - see IMPALA-6816.
+  3: required bool populate_min_subscriber_topic_version = false;
 }
 
 struct TRegisterSubscriberRequest {

http://git-wip-us.apache.org/repos/asf/impala/blob/888dc82f/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index 2a7c2f9..698a2a6 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -590,3 +590,96 @@ class TestStatestore():
     sub.register(topics=[reg])
     LOG.info("Re-registered with id {0}, waiting for 
update".format(sub.subscriber_id))
     sub.wait_for_update(topic_name, target_updates)
+
+  def test_min_subscriber_topic_version(self):
+    self._do_test_min_subscriber_topic_version(False)
+
+  def test_min_subscriber_topic_version_with_straggler(self):
+    self._do_test_min_subscriber_topic_version(True)
+
+  def _do_test_min_subscriber_topic_version(self, simulate_straggler):
+    """Implementation of test that the 'min_subscriber_topic_version' flag is 
correctly
+    set when requested. This tests runs two subscribers concurrently and 
tracks the
+    minimum version each has processed. If 'simulate_straggler' is true, one 
subscriber
+    rejects updates so that its version is not advanced."""
+    topic_name = "test_min_subscriber_topic_version_%s" % uuid.uuid1()
+
+    # This lock is held while processing the update to protect 
last_to_versions.
+    update_lock = threading.Lock()
+    last_to_versions = {}
+    TOTAL_SUBSCRIBERS = 2
+    def callback(sub, args, is_producer, sub_name):
+      """Callback for subscriber to verify min_subscriber_topic_version 
behaviour.
+      If 'is_producer' is true, this acts as the producer, otherwise it acts 
as the
+      consumer. 'sub_name' is a name used to index into last_to_versions."""
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      with update_lock:
+        LOG.info("{0} got update {1}".format(sub_name,
+            repr(args.topic_deltas[topic_name])))
+        LOG.info("Versions: {0}".format(last_to_versions))
+        to_version = args.topic_deltas[topic_name].to_version
+        from_version = args.topic_deltas[topic_name].from_version
+        min_subscriber_topic_version = \
+            args.topic_deltas[topic_name].min_subscriber_topic_version
+
+        if is_producer:
+          assert min_subscriber_topic_version is not None
+          assert (to_version == 0 and min_subscriber_topic_version == 0) or\
+              min_subscriber_topic_version < to_version,\
+              "'to_version' hasn't been created yet by this subscriber."
+          # Only validate version once all subscribers have processed an 
update.
+          if len(last_to_versions) == TOTAL_SUBSCRIBERS:
+            min_to_version = min(last_to_versions.values())
+            assert min_subscriber_topic_version <= min_to_version,\
+                "The minimum subscriber topic version seen by the producer 
cannot get " +\
+                "ahead of the minimum version seem by the consumer, by 
definition."
+            assert min_subscriber_topic_version >= min_to_version - 2,\
+                "The min topic version can be two behind the last version seen 
by " + \
+                "this subscriber because the updates for both subscribers are 
" + \
+                "prepared in parallel and because it's possible that the 
producer " + \
+                "processes two updates in-between consumer updates. This is 
not " + \
+                "absolute but depends on updates not being delayed a large 
amount."
+        else:
+          # Consumer did not request topic version.
+          assert min_subscriber_topic_version is None
+
+        # Check the 'to_version' and update 'last_to_versions'.
+        last_to_version = last_to_versions.get(sub_name, 0)
+        if to_version > 0:
+          # Non-empty update.
+          assert from_version == last_to_version
+        # Stragglers should accept the first update then skip later ones.
+        skip_update = simulate_straggler and not is_producer and 
last_to_version > 0
+        if not skip_update: last_to_versions[sub_name] = to_version
+
+        if is_producer:
+          delta = self.make_topic_update(topic_name)
+          return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
+                                      skipped=False)
+        elif skip_update:
+          return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], 
skipped=True)
+        else:
+          return DEFAULT_UPDATE_STATE_RESPONSE
+
+    # Two concurrent subscribers, which pushes out updates and checks the 
minimum
+    # version, the other which just consumes the updates.
+    def producer_callback(sub, args): return callback(sub, args, True, 
"producer")
+    def consumer_callback(sub, args): return callback(sub, args, False, 
"consumer")
+    consumer_sub = StatestoreSubscriber(update_cb=consumer_callback)
+    consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+    producer_sub = StatestoreSubscriber(update_cb=producer_callback)
+    producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
+        populate_min_subscriber_topic_version=True)
+    NUM_UPDATES = 6
+    (
+      consumer_sub.start()
+          .register(topics=[consumer_reg])
+    )
+    (
+      producer_sub.start()
+          .register(topics=[producer_reg])
+          .wait_for_update(topic_name, NUM_UPDATES)
+    )
+    consumer_sub.wait_for_update(topic_name, NUM_UPDATES)

Reply via email to