IMPALA-4953,IMPALA-6437: separate AC/scheduler from catalog topic updates

This adds a set of "prioritized" statestore topics that are small but
are important to deliver in a timely manner. These are delivered more
frequently by a separate thread pool to reduce the window for stale
admission control and scheduling information.

The contract between statestore and subscriber is changed so that the
statestore can send concurrent Update() RPCs for disjoint sets of
topics. This required changes to the subscriber implementation, which
assumed that only one Update RPC would arrive at a time.

It also changes the locking in the statestore so that the prioritized
update threads don't get stuck behind the catalog threads holding
'topic_lock_'. Specifically, it uses a reader-writer lock to protect
modification of the set of topics and a reader-writer lock per topic to
allow the topic data to be read by multiple threads concurrently.

Added metrics to monitor the per-topic update interval.

Testing:
Ran core tests.

Inspected metrics on Impala daemons, saw that membership and request
queue processing times had more samples recorded than the catalog
topic, reflecting the increased frequency.

Ran under thread sanitizer, made sure no data races were reported in
Statestore or StatestoreSubscriber.

Change-Id: Ifc49c2d0f2a5bfad822545616b8c62b4b95dc210
Reviewed-on: http://gerrit.cloudera.org:8080/9123
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b0d3433e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b0d3433e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b0d3433e

Branch: refs/heads/master
Commit: b0d3433e36d7942b3e10bddc310287266240810b
Parents: e117365
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Wed Jan 24 09:37:36 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Wed Feb 14 22:44:40 2018 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       |   8 +-
 be/src/scheduling/admission-controller.h        |   3 -
 be/src/scheduling/scheduler-test-util.cc        |  10 +-
 be/src/scheduling/scheduler.cc                  |   7 +-
 be/src/scheduling/scheduler.h                   |   2 -
 be/src/service/impala-server.cc                 |   6 +-
 be/src/statestore/statestore-subscriber.cc      | 219 ++++++---
 be/src/statestore/statestore-subscriber.h       | 128 ++---
 be/src/statestore/statestore.cc                 | 490 ++++++++++++-------
 be/src/statestore/statestore.h                  | 295 +++++++----
 common/thrift/metrics.json                      |  25 +-
 .../custom_cluster/test_admission_controller.py |  20 +-
 tests/statestore/test_statestore.py             | 109 +++--
 www/statestore_subscribers.tmpl                 |   2 +
 www/statestore_topics.tmpl                      |   2 +
 15 files changed, 843 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index f43af2c..640a6af 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -50,8 +50,6 @@ int64_t GetProcMemLimit() {
   return ExecEnv::GetInstance()->process_mem_tracker()->limit();
 }
 
-const string 
AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
-
 // Delimiter used for topic keys of the form 
"<pool_name><delimiter><backend_id>".
 // "!" is used because the backend id contains a colon, but it should not 
contain "!".
 // When parsing the topic key we need to be careful to find the last instance 
in
@@ -243,7 +241,7 @@ Status AdmissionController::Init() {
       &AdmissionController::DequeueLoop, this, &dequeue_thread_));
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  Status status = 
subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue 
topic");
   }
@@ -632,7 +630,7 @@ void AdmissionController::UpdatePoolStats(
     AddPoolUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-        incoming_topic_deltas.find(IMPALA_REQUEST_QUEUE_TOPIC);
+        incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
     if (topic != incoming_topic_deltas.end()) {
       const TTopicDelta& delta = topic->second;
       // Delta and non-delta updates are handled the same way, except for a 
full update
@@ -799,7 +797,7 @@ void 
AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
   if (pools_for_updates_.empty()) return;
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
-  topic_delta.topic_name = IMPALA_REQUEST_QUEUE_TOPIC;
+  topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
   for (const string& pool_name: pools_for_updates_) {
     DCHECK(pool_stats_.find(pool_name) != pool_stats_.end());
     PoolStats* stats = GetPoolStats(pool_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 2830bee..3341b1b 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -206,9 +206,6 @@ class AdmissionController {
   class PoolStats;
   friend class PoolStats;
 
-  /// Statestore topic name.
-  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
-
   /// Subscription manager used to handle admission control updates. This is 
not
   /// owned by this class.
   StatestoreSubscriber* subscriber_;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 05cfc42..7363cd3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -467,7 +467,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, 
Result* result) {
 void SchedulerWrapper::AddBackend(const Host& host) {
   // Add to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   AddHostToTopicDelta(host, &delta);
   SendTopicDelta(delta);
@@ -476,7 +476,7 @@ void SchedulerWrapper::AddBackend(const Host& host) {
 void SchedulerWrapper::RemoveBackend(const Host& host) {
   // Add deletion to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   TTopicItem item;
   item.__set_deleted(true);
@@ -487,7 +487,7 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
 
 void SchedulerWrapper::SendFullMembershipMap() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = false;
   for (const Host& host : plan_.cluster().hosts()) {
     if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
@@ -497,7 +497,7 @@ void SchedulerWrapper::SendFullMembershipMap() {
 
 void SchedulerWrapper::SendEmptyUpdate() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   SendTopicDelta(delta);
 }
@@ -547,7 +547,7 @@ void SchedulerWrapper::SendTopicDelta(const TTopicDelta& 
delta) {
   DCHECK(scheduler_ != nullptr);
   // Wrap in topic delta map.
   StatestoreSubscriber::TopicDeltaMap delta_map;
-  delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
+  delta_map.emplace(Statestore::IMPALA_MEMBERSHIP_TOPIC, delta);
 
   // Send to the scheduler.
   vector<TTopicDelta> dummy_result;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 527b5cf..5a9d4bf 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -52,8 +52,6 @@ static const string 
ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
-const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
-
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& 
backend_id,
     MetricGroup* metrics, Webserver* webserver, RequestPoolService* 
request_pool_service)
   : executors_config_(std::make_shared<const BackendConfig>()),
@@ -86,7 +84,8 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
   if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
-    Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, 
true, cb);
+    Status status = statestore_subscriber_->AddTopic(
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;
@@ -123,7 +122,7 @@ void Scheduler::UpdateMembership(
     vector<TTopicDelta>* subscriber_topic_updates) {
   // First look to see if the topic(s) we're interested in have an update
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4be2996..2d77797 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -68,8 +68,6 @@ class SchedulerWrapper;
 ///           configuration.
 class Scheduler {
  public:
-  static const std::string IMPALA_MEMBERSHIP_TOPIC;
-
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d5be4dc..3866e40 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -357,7 +357,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       this->MembershipCallback(state, topic_updates);
     };
     ABORT_IF_ERROR(
-        exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, 
true, cb));
+        exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, 
true, cb));
 
     if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& 
state,
@@ -1482,7 +1482,7 @@ void ImpalaServer::MembershipCallback(
   // TODO: Consider rate-limiting this. In the short term, best to have
   // statestore heartbeat less frequently.
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Scheduler::IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic != incoming_topic_deltas.end()) {
     const TTopicDelta& delta = topic->second;
@@ -1607,7 +1607,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
   }
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
-  update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   update.topic_entries.emplace_back(TTopicItem());
 
   TTopicItem& item = update.topic_entries.back();

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index e58c177..a27d1ae 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -22,6 +22,7 @@
 
 #include <boost/algorithm/string/join.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/lock_options.hpp>
 #include <boost/thread/shared_mutex.hpp>
 #include <gutil/strings/substitute.h>
 
@@ -39,6 +40,9 @@
 #include "common/names.h"
 
 using boost::posix_time::seconds;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::try_to_lock;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
 using namespace strings;
@@ -65,6 +69,9 @@ const string STATESTORE_ID = "STATESTORE";
 // Template for metrics that measure the processing time for individual topics.
 const string CALLBACK_METRIC_PATTERN = 
"statestore-subscriber.topic-$0.processing-time-s";
 
+// Template for metrics that measure the interval between updates for 
individual topics.
+const string UPDATE_INTERVAL_METRIC_PATTERN = 
"statestore-subscriber.topic-$0.update-interval";
+
 // Duration, in ms, to sleep between attempts to reconnect to the
 // statestore after a failure.
 const int32_t SLEEP_INTERVAL_MS = 5000;
@@ -107,41 +114,42 @@ StatestoreSubscriber::StatestoreSubscriber(const 
std::string& subscriber_id,
       failure_detector_(new TimeoutFailureDetector(
           seconds(FLAGS_statestore_subscriber_timeout_seconds),
           seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      is_registered_(false),
       client_cache_(new 
StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
-          FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
-          !FLAGS_ssl_client_ca_certificate.empty())),
-      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
+                FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
+                !FLAGS_ssl_client_ca_certificate.empty())),
+      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+      is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
   last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.last-recovery-time", "N/A");
-  topic_update_interval_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_interval_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-interval-time");
-  topic_update_duration_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_duration_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-duration");
-  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.heartbeat-interval-time");
-
   registration_id_metric_ = metrics->AddProperty<string>(
       "statestore-subscriber.registration-id", "N/A");
-
   client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
 }
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     bool is_transient, const UpdateCallback& callback) {
-  lock_guard<mutex> l(lock_);
+  lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new 
topic");
-  Callbacks* cb = &(update_callbacks_[topic_id]);
-  cb->callbacks.push_back(callback);
-  if (cb->processing_time_metric == NULL) {
-    cb->processing_time_metric = 
StatsMetric<double>::CreateAndRegister(metrics_,
+  TopicRegistration& registration = topic_registrations_[topic_id];
+  registration.callbacks.push_back(callback);
+  if (registration.processing_time_metric == nullptr) {
+    registration.processing_time_metric = 
StatsMetric<double>::CreateAndRegister(metrics_,
         CALLBACK_METRIC_PATTERN, topic_id);
+    registration.update_interval_metric = 
StatsMetric<double>::CreateAndRegister(metrics_,
+        UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
+    registration.update_interval_timer.Start();
   }
-  topic_registrations_[topic_id] = is_transient;
+  registration.is_transient = is_transient;
   return Status::OK();
 }
 
@@ -151,11 +159,10 @@ Status StatestoreSubscriber::Register() {
   RETURN_IF_ERROR(client_status);
 
   TRegisterSubscriberRequest request;
-  request.topic_registrations.reserve(update_callbacks_.size());
-  for (const UpdateCallbacks::value_type& topic: update_callbacks_) {
+  for (const auto& registration : topic_registrations_) {
     TTopicRegistration thrift_topic;
-    thrift_topic.topic_name = topic.first;
-    thrift_topic.is_transient = topic_registrations_[topic.first];
+    thrift_topic.topic_name = registration.first;
+    thrift_topic.is_transient = registration.second.is_transient;
     request.topic_registrations.push_back(thrift_topic);
   }
 
@@ -175,7 +182,6 @@ Status StatestoreSubscriber::Register() {
   } else {
     VLOG(1) << "No subscriber registration ID received from statestore";
   }
-  topic_update_interval_timer_.Start();
   heartbeat_interval_timer_.Start();
   return status;
 }
@@ -186,7 +192,7 @@ Status StatestoreSubscriber::Start() {
     // Take the lock to ensure that, if a topic-update is received during 
registration
     // (perhaps because Register() has succeeded, but we haven't finished 
setting up state
     // on the client side), UpdateState() will reject the message.
-    lock_guard<mutex> l(lock_);
+    lock_guard<shared_mutex> exclusive_lock(lock_);
     LOG(INFO) << "Starting statestore subscriber";
 
     // Backend must be started before registration
@@ -241,7 +247,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
     if (failure_detector_->GetPeerState(STATESTORE_ID) == 
FailureDetector::FAILED) {
       // When entering recovery mode, the class-wide lock_ is taken to
       // ensure mutual exclusion with any operations in flight.
-      lock_guard<mutex> l(lock_);
+      lock_guard<shared_mutex> exclusive_lock(lock_);
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
       connected_to_statestore_metric_->SetValue(false);
@@ -313,74 +319,127 @@ void StatestoreSubscriber::Heartbeat(const 
RegistrationId& registration_id) {
 Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& 
incoming_topic_deltas,
     const RegistrationId& registration_id, vector<TTopicDelta>* 
subscriber_topic_updates,
     bool* skipped) {
+  RETURN_IF_ERROR(CheckRegistrationId(registration_id));
+
+  // Put the updates into ascending order of topic name to match the lock 
acquisition
+  // order of TopicRegistration::update_lock.
+  vector<const TTopicDelta*> deltas_to_process;
+  for (auto& delta : incoming_topic_deltas) 
deltas_to_process.push_back(&delta.second);
+  sort(deltas_to_process.begin(), deltas_to_process.end(),
+      [](const TTopicDelta* left, const TTopicDelta* right) {
+        return left->topic_name < right->topic_name;
+      });
+  // Unique locks to hold the 'update_lock' for each entry in 
'deltas_to_process'. Locks
+  // are held until we finish processing the update to prevent any races with 
concurrent
+  // updates for the same topic.
+  vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
+
   // We don't want to block here because this is an RPC, and delaying the 
return causes
   // the statestore to delay sending further messages. The only time that 
lock_ might be
-  // taken concurrently is if:
+  // taken exclusively is if the subscriber is recovering, and has the lock 
held during
+  // RecoveryModeChecker(). In this case we skip all topics and don't update 
any metrics.
   //
-  // a) another update is still being processed (i.e. is still in 
UpdateState()). This
-  // could happen only when the subscriber has re-registered, and the 
statestore is still
-  // sending an update for the previous registration. In this case, return OK 
but set
-  // *skipped = true to tell the statestore to retry this update in the future.
+  // UpdateState() may run concurrently with itself in two cases:
+  // a) disjoint sets of topics are being updated. In that case the updates 
can proceed
+  // concurrently.
+  // b) another update for the same topics is still being processed (i.e. is 
still in
+  // UpdateState()). This could happen only when the subscriber has 
re-registered, and
+  // the statestore is still sending an update for the previous registration. 
In this
+  // case, we notices that the per-topic 'update_lock' is held, skip 
processing all
+  // of the topic updates and set *skipped = true so that the statestore will 
retry this
+  // update in the future.
   //
-  // b) the subscriber is recovering, and has the lock held during
-  // RecoveryModeChecker(). Similarly, we set *skipped = true.
   // TODO: Consider returning an error in this case so that the statestore 
will eventually
   // stop sending updates even if re-registration fails.
-  try_mutex::scoped_try_lock l(lock_);
-  if (l) {
-    *skipped = false;
-    RETURN_IF_ERROR(CheckRegistrationId(registration_id));
-
-    // Only record updates received when not in recovery mode
-    topic_update_interval_metric_->Update(
-        topic_update_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
-    MonotonicStopWatch sw;
-    sw.Start();
-
-    // Check the version ranges of all delta updates to ensure they can be 
applied
-    // to this subscriber. If any invalid ranges are found, request new 
update(s) with
-    // version ranges applicable to this subscriber.
-    bool found_unexpected_delta = false;
-    for (const TopicDeltaMap::value_type& delta: incoming_topic_deltas) {
-      TopicVersionMap::const_iterator itr = 
current_topic_versions_.find(delta.first);
-      if (itr != current_topic_versions_.end()) {
-        if (delta.second.is_delta && delta.second.from_version != itr->second) 
{
-          LOG(ERROR) << "Unexpected delta update to topic '" << delta.first << 
"' of "
-                     << "version range (" << delta.second.from_version << ":"
-                     << delta.second.to_version << "]. Expected delta start 
version: "
-                     << itr->second;
-
-          subscriber_topic_updates->push_back(TTopicDelta());
-          TTopicDelta& update = subscriber_topic_updates->back();
-          update.topic_name = delta.second.topic_name;
-          update.__set_from_version(itr->second);
-          found_unexpected_delta = true;
-        } else {
-          // Update the current topic version
-          current_topic_versions_[delta.first] = delta.second.to_version;
-        }
-      }
+  shared_lock<shared_mutex> l(lock_, try_to_lock);
+  if (!l.owns_lock()) {
+    *skipped = true;
+    return Status::OK();
+  }
+
+  // First, acquire all the topic locks and update the interval metrics
+  // Record the time we received the update before doing any processing to 
avoid including
+  // processing time in the interval metrics.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    // Skip updates to unregistered topics.
+    if (it == topic_registrations_.end()) {
+      LOG(ERROR) << "Unexpected delta update for unregistered topic: "
+                 << delta.topic_name;
+      continue;
+    }
+    TopicRegistration& registration = it->second;
+    unique_lock<mutex> ul(registration.update_lock, try_to_lock);
+    if (!ul.owns_lock()) {
+      // Statestore sent out concurrent topic updates. Avoid blocking the RPC 
by skipping
+      // the topic.
+      LOG(ERROR) << "Could not acquire lock for topic " << delta.topic_name << 
". "
+                 << "Skipping update.";
+      *skipped = true;
+      return Status::OK();
     }
+    double interval =
+        registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0);
+    registration.update_interval_metric->Update(interval);
+    topic_update_interval_metric_->Update(interval);
 
-    // Skip calling the callbacks when an unexpected delta update is found.
-    if (!found_unexpected_delta) {
-      for (const UpdateCallbacks::value_type& callbacks: update_callbacks_) {
-        MonotonicStopWatch sw;
-        sw.Start();
-        for (const UpdateCallback& callback: callbacks.second.callbacks) {
-          // TODO: Consider filtering the topics to only send registered 
topics to
-          // callbacks
-          callback(incoming_topic_deltas, subscriber_topic_updates);
-        }
-        callbacks.second.processing_time_metric->Update(
-            sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-      }
+    // Hold onto lock until we've finished processing the update.
+    topic_update_locks[i].swap(ul);
+  }
+
+  MonotonicStopWatch sw;
+  sw.Start();
+  // Second, do the actual processing of topic updates that we validated and 
acquired
+  // locks for above.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    if (delta.is_delta && registration.current_topic_version != -1
+      && delta.from_version != registration.current_topic_version) {
+      // Received a delta update for the wrong version. Log an error and send 
back the
+      // expected version to the statestore to request a new update with the 
correct
+      // version range.
+      LOG(ERROR) << "Unexpected delta update to topic '" << delta.topic_name 
<< "' of "
+                 << "version range (" << delta.from_version << ":"
+                 << delta.to_version << "]. Expected delta start version: "
+                 << registration.current_topic_version;
+
+      subscriber_topic_updates->push_back(TTopicDelta());
+      TTopicDelta& update = subscriber_topic_updates->back();
+      update.topic_name = delta.topic_name;
+      update.__set_from_version(registration.current_topic_version);
+      continue;
     }
-    sw.Stop();
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 
* 1000.0));
-  } else {
-    *skipped = true;
+    // The topic version in the update is valid, process the update.
+    MonotonicStopWatch update_callback_sw;
+    update_callback_sw.Start();
+    for (const UpdateCallback& callback : registration.callbacks) {
+      callback(incoming_topic_deltas, subscriber_topic_updates);
+    }
+    update_callback_sw.Stop();
+    registration.current_topic_version = delta.to_version;
+    registration.processing_time_metric->Update(
+        sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+  }
+
+  // Third and finally, reset the interval timers so they correctly measure the
+  // time between RPCs, excluding processing time.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    registration.update_interval_timer.Reset();
   }
+  sw.Stop();
+  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h 
b/be/src/statestore/statestore-subscriber.h
index e8b2204..f102cae 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -24,6 +24,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
 
 #include "statestore/statestore.h"
 #include "util/stopwatch.h"
@@ -84,9 +85,9 @@ class StatestoreSubscriber {
   /// Function called to update a service with new state. Called in a
   /// separate thread to the one in which it is registered.
   //
-  /// Every UpdateCallback is invoked every time that an update is
-  /// received from the statestore. Therefore the callback should not
-  /// assume that the TopicDeltaMap contains an entry for their
+  /// Every UpdateCallback is invoked every time that an update for the
+  /// topic is received from the statestore. Therefore the callback should
+  /// not assume that the TopicDeltaMap contains an entry for their
   /// particular topic of interest.
   //
   /// If a delta for a particular topic does not have the 'is_delta'
@@ -145,55 +146,7 @@ class StatestoreSubscriber {
   /// Thread in which RecoveryModeChecker runs.
   std::unique_ptr<Thread> recovery_mode_thread_;
 
-  /// Class-wide lock. Protects all subsequent members. Most private methods 
must
-  /// be called holding this lock; this is noted in the method comments.
-  boost::mutex lock_;
-
-  /// Set to true after Register(...) is successful, after which no
-  /// more topics may be subscribed to.
-  bool is_registered_;
-
-  /// Protects registration_id_. Must be taken after lock_ if both are to be 
taken
-  /// together.
-  boost::mutex registration_id_lock_;
-
-  /// Set during Register(), this is the unique ID of the current registration 
with the
-  /// statestore. If this subscriber must recover, or disconnects and then 
reconnects, the
-  /// registration_id_ will change after Register() is called again. This 
allows the
-  /// subscriber to reject communication from the statestore that pertains to 
a previous
-  /// registration.
-  RegistrationId registration_id_;
-
-  struct Callbacks {
-    /// Owned by the MetricGroup instance. Tracks how long callbacks took to 
process this
-    /// topic.
-    StatsMetric<double>* processing_time_metric;
-
-    /// List of callbacks to invoke for this topic.
-    std::vector<UpdateCallback> callbacks;
-  };
-
-  /// Mapping of topic ids to their associated callbacks. Because this mapping
-  /// stores a pointer to an UpdateCallback, memory errors will occur if an 
UpdateCallback
-  /// is deleted before being unregistered. The UpdateCallback destructor 
checks for
-  /// such problems, so that we will have an assertion failure rather than a 
memory error.
-  typedef boost::unordered_map<Statestore::TopicId, Callbacks> UpdateCallbacks;
-
-  /// Callback for all services that have registered for updates (indexed by 
the associated
-  /// SubscriptionId), and associated lock.
-  UpdateCallbacks update_callbacks_;
-
-  /// One entry for every topic subscribed to. The value is whether this 
subscriber
-  /// considers this topic to be 'transient', that is any updates it makes 
will be deleted
-  /// upon failure or disconnection.
-  std::map<Statestore::TopicId, bool> topic_registrations_;
-
-  /// Mapping of TopicId to the last version of the topic this subscriber 
successfully
-  /// processed.
-  typedef boost::unordered_map<Statestore::TopicId, int64_t> TopicVersionMap;
-  TopicVersionMap current_topic_versions_;
-
-  /// statestore client cache - only one client is ever used.
+  /// statestore client cache - only one client is ever used. Initialized in 
constructor.
   boost::scoped_ptr<StatestoreClientCache> client_cache_;
 
   /// MetricGroup instance that all metrics are registered in. Not owned by 
this class.
@@ -208,25 +161,82 @@ class StatestoreSubscriber {
   /// When the last recovery happened.
   StringProperty* last_recovery_time_metric_;
 
-  /// Accumulated statistics on the frequency of topic-update messages
+  /// Accumulated statistics on the frequency of topic-update messages, 
including samples
+  /// from all topics.
   StatsMetric<double>* topic_update_interval_metric_;
 
-  /// Tracks the time between topic-update mesages
-  MonotonicStopWatch topic_update_interval_timer_;
-
   /// Accumulated statistics on the time taken to process each topic-update 
message from
   /// the statestore (that is, to call all callbacks)
   StatsMetric<double>* topic_update_duration_metric_;
 
-  /// Tracks the time between heartbeat mesages
-  MonotonicStopWatch heartbeat_interval_timer_;
-
   /// Accumulated statistics on the frequency of heartbeat messages
   StatsMetric<double>* heartbeat_interval_metric_;
 
+  /// Tracks the time between heartbeat messages. Only updated by Heartbeat(), 
which
+  /// should not run concurrently with itself.
+  MonotonicStopWatch heartbeat_interval_timer_;
+
   /// Current registration ID, in string form.
   StringProperty* registration_id_metric_;
 
+  /// Object-wide lock that protects the below members. Must be held 
exclusively when
+  /// modifying the members, except when modifying TopicRegistrations - see
+  /// TopicRegistration::update_lock for details of locking there. Held in 
shared mode
+  /// when processing topic updates to prevent concurrent updates to other 
state. Most
+  /// private methods must be called holding this lock; this is noted in the 
method
+  /// comments.
+  boost::shared_mutex lock_;
+
+  /// Set to true after Register(...) is successful, after which no
+  /// more topics may be subscribed to.
+  bool is_registered_;
+
+  /// Protects registration_id_. Must be taken after lock_ if both are to be 
taken
+  /// together.
+  boost::mutex registration_id_lock_;
+
+  /// Set during Register(), this is the unique ID of the current registration 
with the
+  /// statestore. If this subscriber must recover, or disconnects and then 
reconnects, the
+  /// registration_id_ will change after Register() is called again. This 
allows the
+  /// subscriber to reject communication from the statestore that pertains to 
a previous
+  /// registration.
+  RegistrationId registration_id_;
+
+  struct TopicRegistration {
+    /// Held when processing a topic update. 'StatestoreSubscriber::lock_' 
must be held in
+    /// shared mode before acquiring this lock. If taking multiple update 
locks, they must
+    /// be acquired in ascending order of topic name.
+    boost::mutex update_lock;
+
+    /// Whether the subscriber considers this topic to be "transient", that is 
any updates
+    /// it makes will be deleted upon failure or disconnection.
+    bool is_transient = false;
+
+    /// The last version of the topic this subscriber processed.
+    /// -1 if no updates have been processed yet.
+    int64_t current_topic_version = -1;
+
+    /// Owned by the MetricGroup instance. Tracks how long callbacks took to 
process this
+    /// topic.
+    StatsMetric<double>* processing_time_metric = nullptr;
+
+    /// Tracks the time between topic-update messages to update 
'update_interval_metric'.
+    MonotonicStopWatch update_interval_timer;
+
+    /// Owned by the MetricGroup instances. Tracks the time between the end of 
the last
+    /// update RPC for this topic and the start of the next.
+    StatsMetric<double>* update_interval_metric = nullptr;
+
+    /// Callback for all services that have registered for updates.
+    std::vector<UpdateCallback> callbacks;
+  };
+
+  /// One entry for every topic subscribed to. 'lock_' must be held 
exclusively to add or
+  /// remove entries from the map or held as a shared lock to lookup entries 
in the map.
+  /// Modifications to the contents of each TopicRegistration is protected by
+  /// TopicRegistration::update_lock.
+  boost::unordered_map<Statestore::TopicId, TopicRegistration> 
topic_registrations_;
+
   /// Subscriber thrift implementation, needs to access UpdateState
   friend class StatestoreSubscriberThriftIf;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8f4ddbf..1072dee 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -17,6 +17,10 @@
 
 #include "statestore/statestore.h"
 
+#include <algorithm>
+#include <tuple>
+#include <utility>
+
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
 #include <thrift/Thrift.h>
@@ -36,6 +40,12 @@
 
 #include "common/names.h"
 
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::upgrade_lock;
+using boost::upgrade_to_unique_lock;
+using std::forward_as_tuple;
+using std::piecewise_construct;
 using namespace apache::thrift;
 using namespace impala;
 using namespace rapidjson;
@@ -50,6 +60,15 @@ DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) 
Number of threads us
 DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in 
ms) with"
     " which the statestore sends topic updates to subscribers.");
 
+// Priority updates are sent out much more frequently. They are assumed to be 
small
+// amounts of data that take a small amount of time to process. Assuming each 
update
+// takes < 1ms to process, sending out an update every 100ms will consume less 
than
+// 1% of a CPU on each subscriber.
+DEFINE_int32(statestore_num_priority_update_threads, 10, "(Advanced) Number of 
threads "
+    "used to send prioritized topic updates in parallel to all registered 
subscribers.");
+DEFINE_int32(statestore_priority_update_frequency_ms, 100, "(Advanced) 
Frequency (in ms) "
+    "with which the statestore sends prioritized topic updates to 
subscribers.");
+
 DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of 
threads used to "
     " send heartbeats in parallel to all registered subscribers.");
 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency 
(in ms) with"
@@ -87,18 +106,23 @@ const string STATESTORE_TOTAL_KEY_SIZE_BYTES = 
"statestore.total-key-size-bytes"
 const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = 
"statestore.total-value-size-bytes";
 const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = 
"statestore.total-topic-size-bytes";
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
+const string STATESTORE_PRIORITY_UPDATE_DURATION =
+    "statestore.priority-topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
 // Initial version for each Topic registered by a Subscriber. Generally, the 
Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this 
initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to 
distinguish
 // between the case where a Topic is empty and the case where the Topic only 
contains
-// an item with the initial version.
+// an entry with the initial version.
 const Statestore::TopicEntry::Version 
Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
 
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
+const string Statestore::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
+const string Statestore::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
+
 typedef ClientConnection<StatestoreSubscriberClientWrapper> 
StatestoreSubscriberConn;
 
 class StatestoreThriftIf : public StatestoreServiceIf {
@@ -127,46 +151,55 @@ void Statestore::TopicEntry::SetValue(const 
Statestore::TopicEntry::Value& bytes
   version_ = version;
 }
 
-Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
-  TopicEntryMap::iterator entry_it = entries_.find(key);
-  int64_t key_size_delta = 0;
-  int64_t value_size_delta = 0;
-  if (entry_it == entries_.end()) {
-    entry_it = entries_.insert(make_pair(key, TopicEntry())).first;
-    key_size_delta += key.size();
-  } else {
-    // Delete the old item from the version history. There is no need to 
search the
-    // version_history because there should only be at most a single item in 
the history
-    // at any given time
-    topic_update_log_.erase(entry_it->second.version());
-    value_size_delta -= entry_it->second.value().size();
+vector<Statestore::TopicEntry::Version> Statestore::Topic::Put(
+    const std::vector<TTopicItem>& entries) {
+  vector<Statestore::TopicEntry::Version> versions;
+  versions.reserve(entries.size());
+
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
+  for (const TTopicItem& entry: entries) {
+    TopicEntryMap::iterator entry_it = entries_.find(entry.key);
+    int64_t key_size_delta = 0;
+    int64_t value_size_delta = 0;
+    if (entry_it == entries_.end()) {
+      entry_it = entries_.emplace(entry.key, TopicEntry()).first;
+      key_size_delta += entry.key.size();
+    } else {
+      // Delete the old entry from the version history. There is no need to 
search the
+      // version_history because there should only be at most a single entry 
in the
+      // history at any given time.
+      topic_update_log_.erase(entry_it->second.version());
+      value_size_delta -= entry_it->second.value().size();
+    }
+    value_size_delta += entry.value.size();
+
+    entry_it->second.SetValue(entry.value, ++last_version_);
+    entry_it->second.SetDeleted(entry.deleted);
+    topic_update_log_.emplace(entry_it->second.version(), entry.key);
+
+    total_key_size_bytes_ += key_size_delta;
+    total_value_size_bytes_ += value_size_delta;
+    DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
+    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
+    key_size_metric_->Increment(key_size_delta);
+    value_size_metric_->Increment(value_size_delta);
+    topic_size_metric_->Increment(key_size_delta + value_size_delta);
+    versions.push_back(entry_it->second.version());
   }
-  value_size_delta += bytes.size();
-
-  entry_it->second.SetValue(bytes, ++last_version_);
-  entry_it->second.SetDeleted(is_deleted);
-  topic_update_log_.insert(make_pair(entry_it->second.version(), key));
-
-  total_key_size_bytes_ += key_size_delta;
-  total_value_size_bytes_ += value_size_delta;
-  DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
-  DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-  key_size_metric_->Increment(key_size_delta);
-  value_size_metric_->Increment(value_size_delta);
-  topic_size_metric_->Increment(key_size_delta + value_size_delta);
-
-  return entry_it->second.version();
+  return versions;
 }
 
 void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     const Statestore::TopicEntryKey& key) {
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
   TopicEntryMap::iterator entry_it = entries_.find(key);
   if (entry_it != entries_.end() && entry_it->second.version() == version) {
     // Add a new entry with the the version history for this deletion and 
remove the old
     // entry
     topic_update_log_.erase(version);
-    topic_update_log_.insert(make_pair(++last_version_, key));
+    topic_update_log_.emplace(++last_version_, key);
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
     entry_it->second.SetDeleted(true);
@@ -174,6 +207,79 @@ void 
Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
   }
 }
 
+void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
+    TopicEntry::Version last_processed_version, TTopicDelta* delta) {
+  // If the subscriber version is > 0, send this update as a delta. Otherwise, 
this is
+  // a new subscriber so send them a non-delta update that includes all 
entries in the
+  // topic.
+  delta->is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
+  delta->__set_from_version(last_processed_version);
+  {
+    // Acquire shared lock - we are not modifying the topic.
+    shared_lock<shared_mutex> read_lock(lock_);
+    TopicUpdateLog::const_iterator next_update =
+        topic_update_log_.upper_bound(last_processed_version);
+
+    uint64_t topic_size = 0;
+    for (; next_update != topic_update_log_.end(); ++next_update) {
+      TopicEntryMap::const_iterator itr = entries_.find(next_update->second);
+      DCHECK(itr != entries_.end());
+      const TopicEntry& topic_entry = itr->second;
+      // Don't send deleted entries for non-delta updates.
+      if (!delta->is_delta && topic_entry.is_deleted()) {
+        continue;
+      }
+      delta->topic_entries.push_back(TTopicItem());
+      TTopicItem& delta_entry = delta->topic_entries.back();
+      delta_entry.key = itr->first;
+      delta_entry.value = topic_entry.value();
+      delta_entry.deleted = topic_entry.is_deleted();
+      topic_size += delta_entry.key.size() + delta_entry.value.size();
+    }
+
+    if (!delta->is_delta &&
+        last_version_ > Subscriber::TOPIC_INITIAL_VERSION) {
+      VLOG_QUERY << "Preparing initial " << delta->topic_name
+                 << " topic update for " << subscriber_id << ". Size = "
+                 << PrettyPrinter::Print(topic_size, TUnit::BYTES);
+    }
+
+    if (topic_update_log_.size() > 0) {
+      // The largest version for this topic will be the last entry in the 
version history
+      // map.
+      delta->__set_to_version(topic_update_log_.rbegin()->first);
+    } else {
+      // There are no updates in the version history
+      delta->__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
+    }
+  }
+}
+void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
+  // Acquire shared lock - we are not modifying the topic.
+  shared_lock<shared_mutex> read_lock(lock_);
+  Value topic_id(topic_id_.c_str(), document->GetAllocator());
+  topic_json->AddMember("topic_id", topic_id, document->GetAllocator());
+  topic_json->AddMember("num_entries",
+      static_cast<uint64_t>(entries_.size()),
+      document->GetAllocator());
+  topic_json->AddMember("version", last_version_, document->GetAllocator());
+
+  int64_t key_size = total_key_size_bytes_;
+  int64_t value_size = total_value_size_bytes_;
+  Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("key_size", key_size_json, document->GetAllocator());
+  Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("value_size", value_size_json, 
document->GetAllocator());
+  Value total_size_json(
+      PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("total_size", total_size_json, 
document->GetAllocator());
+  topic_json->AddMember("prioritized", IsPrioritizedTopic(topic_id_),
+      document->GetAllocator());
+}
+
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, const TNetworkAddress& 
network_address,
     const vector<TTopicRegistration>& subscribed_topics)
@@ -181,47 +287,94 @@ Statestore::Subscriber::Subscriber(const SubscriberId& 
subscriber_id,
       registration_id_(registration_id),
       network_address_(network_address) {
   for (const TTopicRegistration& topic: subscribed_topics) {
-    TopicState topic_state;
-    topic_state.is_transient = topic.is_transient;
-    topic_state.last_version = TOPIC_INITIAL_VERSION;
-    subscribed_topics_[topic.topic_name] = topic_state;
+    GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
+        forward_as_tuple(topic.topic_name), 
forward_as_tuple(topic.is_transient));
   }
 }
 
-void Statestore::Subscriber::AddTransientUpdate(const TopicId& topic_id,
-    const TopicEntryKey& topic_key, TopicEntry::Version version) {
+bool Statestore::Subscriber::AddTransientEntries(const TopicId& topic_id,
+    const vector<TTopicItem>& entries,
+    const vector<TopicEntry::Version>& entry_versions) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  DCHECK_EQ(entries.size(), entry_versions.size());
   // Only record the update if the topic is transient
-  const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
-  DCHECK(topic_it != subscribed_topics_.end());
-  if (topic_it->second.is_transient == true) {
-    transient_entries_[make_pair(topic_id, topic_key)] = version;
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  if (topic_it->second.is_transient) {
+    if (unregistered_) return false;
+    for (int i = 0; i < entries.size(); ++i) {
+      topic_it->second.transient_entries_[entries[i].key] = entry_versions[i];
+    }
   }
+  return true;
+}
+
+void Statestore::Subscriber::DeleteAllTransientEntries(TopicMap* 
global_topics) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      auto global_topic_it = global_topics->find(topic.first);
+      DCHECK(global_topic_it != global_topics->end());
+      for (auto& transient_entry : topic.second.transient_entries_) {
+        global_topic_it->second.DeleteIfVersionsMatch(transient_entry.second,
+            transient_entry.first);
+      }
+    }
+  }
+  unregistered_ = true;
+}
+
+int64_t Statestore::Subscriber::NumTransientEntries() {
+  lock_guard<mutex> l(transient_entry_lock_);
+  int64_t num_entries = 0;
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      num_entries += topic.second.transient_entries_.size();
+    }
+  }
+  return num_entries;
 }
 
 Statestore::TopicEntry::Version 
Statestore::Subscriber::LastTopicVersionProcessed(
     const TopicId& topic_id) const {
-  Topics::const_iterator itr = subscribed_topics_.find(topic_id);
-  return itr == subscribed_topics_.end() ?
-      TOPIC_INITIAL_VERSION : itr->second.last_version;
+  const Topics& subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::const_iterator itr = subscribed_topics.find(topic_id);
+  return itr == subscribed_topics.end() ? TOPIC_INITIAL_VERSION
+                                        : itr->second.last_version.Load();
 }
 
 void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& 
topic_id,
     TopicEntry::Version version) {
-  subscribed_topics_[topic_id].last_version = version;
+  // Safe to call concurrently for different topics because 
'subscribed_topics' is not
+  // modified.
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  topic_it->second.last_version.Store(version);
 }
 
 Statestore::Statestore(MetricGroup* metrics)
-  : exit_flag_(false),
-    subscriber_topic_update_threadpool_("statestore-update",
+  : subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
         FLAGS_statestore_num_update_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, 
_2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::TOPIC_UPDATE, _1, _2)),
+    subscriber_priority_topic_update_threadpool_("statestore-priority-update",
+        "subscriber-priority-update-worker",
+        FLAGS_statestore_num_priority_update_threads,
+        FLAGS_statestore_max_subscribers,
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::PRIORITY_TOPIC_UPDATE, _1, _2)),
     subscriber_heartbeat_threadpool_("statestore-heartbeat",
         "subscriber-heartbeat-worker",
         FLAGS_statestore_num_heartbeat_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, 
_2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::HEARTBEAT, _1, _2)),
     update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
@@ -245,6 +398,8 @@ Statestore::Statestore(MetricGroup* metrics)
 
   topic_update_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, 
STATESTORE_UPDATE_DURATION);
+  priority_topic_update_duration_metric_ = 
StatsMetric<double>::CreateAndRegister(
+      metrics, STATESTORE_PRIORITY_UPDATE_DURATION);
   heartbeat_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, 
STATESTORE_HEARTBEAT_DURATION);
 
@@ -254,6 +409,7 @@ Statestore::Statestore(MetricGroup* metrics)
 
 Status Statestore::Init() {
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
+  RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
   return Status::OK();
 }
@@ -275,43 +431,21 @@ void Statestore::RegisterWebpages(Webserver* webserver) {
 void Statestore::TopicsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   lock_guard<mutex> l(subscribers_lock_);
-  lock_guard<mutex> t(topic_lock_);
+  shared_lock<shared_mutex> t(topics_map_lock_);
 
   Value topics(kArrayType);
 
-  for (const TopicMap::value_type& topic: topics_) {
+  for (TopicMap::value_type& topic: topics_) {
     Value topic_json(kObjectType);
-
-    Value topic_id(topic.second.id().c_str(), document->GetAllocator());
-    topic_json.AddMember("topic_id", topic_id, document->GetAllocator());
-    topic_json.AddMember("num_entries",
-        static_cast<uint64_t>(topic.second.entries().size()),
-        document->GetAllocator());
-    topic_json.AddMember(
-        "version", topic.second.last_version(), document->GetAllocator());
-
+    topic.second.ToJson(document, &topic_json);
     SubscriberId oldest_subscriber_id;
     TopicEntry::Version oldest_subscriber_version =
         GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
-
     topic_json.AddMember("oldest_version", oldest_subscriber_version,
         document->GetAllocator());
     Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
     topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
-
-    int64_t key_size = topic.second.total_key_size_bytes();
-    int64_t value_size = topic.second.total_value_size_bytes();
-    Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("key_size", key_size_json, document->GetAllocator());
-    Value value_size_json(PrettyPrinter::Print(value_size, 
TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("value_size", value_size_json, 
document->GetAllocator());
-    Value total_size_json(
-        PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("total_size", total_size_json, 
document->GetAllocator());
-    topics.PushBack(topic_json, document->GetAllocator());
+        topics.PushBack(topic_json, document->GetAllocator());
   }
   document->AddMember("topics", topics, document->GetAllocator());
 }
@@ -330,11 +464,18 @@ void Statestore::SubscribersHandler(const 
Webserver::ArgumentMap& args,
         document->GetAllocator());
     sub_json.AddMember("address", address, document->GetAllocator());
 
+    int64_t num_priority_topics =
+        subscriber.second->priority_subscribed_topics().size();
+    int64_t num_non_priority_topics =
+        subscriber.second->non_priority_subscribed_topics().size();
     sub_json.AddMember("num_topics",
-        static_cast<uint64_t>(subscriber.second->subscribed_topics().size()),
+        static_cast<uint64_t>(num_priority_topics + num_non_priority_topics),
+        document->GetAllocator());
+    sub_json.AddMember("num_priority_topics",
+        static_cast<uint64_t>(num_priority_topics),
         document->GetAllocator());
     sub_json.AddMember("num_transient",
-        static_cast<uint64_t>(subscriber.second->transient_entries().size()),
+        static_cast<uint64_t>(subscriber.second->NumTransientEntries()),
         document->GetAllocator());
 
     Value 
registration_id(PrintId(subscriber.second->registration_id()).c_str(),
@@ -378,14 +519,20 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
   // Create any new topics first, so that when the subscriber is first sent a 
topic update
   // by the worker threads its topics are guaranteed to exist.
   {
-    lock_guard<mutex> l(topic_lock_);
+    // Start with a shared read lock when checking the map. In the common case 
the topic
+    // will already exist, so we don't need to immediately get the exclusive 
lock and
+    // block other threads.
+    upgrade_lock<shared_mutex> topic_read_lock(topics_map_lock_);
     for (const TTopicRegistration& topic: topic_registrations) {
       TopicMap::iterator topic_it = topics_.find(topic.topic_name);
       if (topic_it == topics_.end()) {
+        // Upgrade to an exclusive lock when modifying the map.
+        upgrade_to_unique_lock<shared_mutex> topic_write_lock(topic_read_lock);
         LOG(INFO) << "Creating new topic: ''" << topic.topic_name
                   << "' on behalf of subscriber: '" << subscriber_id;
-        topics_.insert(make_pair(topic.topic_name, Topic(topic.topic_name,
-            key_size_metric_, value_size_metric_, topic_size_metric_)));
+        topics_.emplace(piecewise_construct, 
forward_as_tuple(topic.topic_name),
+            forward_as_tuple(topic.topic_name, key_size_metric_, 
value_size_metric_,
+            topic_size_metric_));
       }
     }
   }
@@ -400,7 +547,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
     UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
     shared_ptr<Subscriber> current_registration(
         new Subscriber(subscriber_id, *registration_id, location, 
topic_registrations));
-    subscribers_.insert(make_pair(subscriber_id, current_registration));
+    subscribers_.emplace(subscriber_id, current_registration);
     failure_detector_->UpdateHeartbeat(
         PrintId(current_registration->registration_id()), true);
     num_subscribers_metric_->SetValue(subscribers_.size());
@@ -409,6 +556,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
     // Add the subscriber to the update queue, with an immediate schedule.
     ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
+    RETURN_IF_ERROR(OfferUpdate(update, 
&subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
   }
 
@@ -428,7 +576,8 @@ bool Statestore::FindSubscriber(const SubscriberId& 
subscriber_id,
   return true;
 }
 
-Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* 
update_skipped) {
+Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind 
update_kind,
+    bool* update_skipped) {
   // Time any successful RPCs (i.e. those for which UpdateState() completed, 
even though
   // it may have returned an error.)
   MonotonicStopWatch sw;
@@ -436,7 +585,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, 
bool* update_skipped)
 
   // First thing: make a list of updates to send
   TUpdateStateRequest update_state_request;
-  GatherTopicUpdates(*subscriber, &update_state_request);
+  GatherTopicUpdates(*subscriber, update_kind, &update_state_request);
+  // 'subscriber' may not be subscribed to any updates of 'update_kind'.
+  if (update_state_request.topic_deltas.empty()) {
+    *update_skipped = false;
+    return Status::OK();
+  }
 
   // Set the expected registration ID, so that the subscriber can reject this 
update if
   // they have moved on to a new registration instance.
@@ -452,9 +606,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, 
bool* update_skipped)
   RETURN_IF_ERROR(client.DoRpc(
       &StatestoreSubscriberClientWrapper::UpdateState, update_state_request, 
&response));
 
+  StatsMetric<double>* update_duration_metric =
+      update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE ?
+      priority_topic_update_duration_metric_ : topic_update_duration_metric_;
   status = Status(response.status);
   if (!status.ok()) {
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 
* 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
     return status;
   }
 
@@ -463,7 +620,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, 
bool* update_skipped)
     // The subscriber skipped processing this update. We don't consider this a 
failure
     // - subscribers can decide what they do with any update - so, return OK 
and set
     // update_skipped so the caller can compensate.
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 
* 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
     return Status::OK();
   }
 
@@ -476,7 +633,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, 
bool* update_skipped)
 
   // Thirdly: perform any / all updates returned by the subscriber
   {
-    lock_guard<mutex> l(topic_lock_);
+    shared_lock<shared_mutex> l(topics_map_lock_);
     for (const TTopicDelta& update: response.topic_updates) {
       TopicMap::iterator topic_it = topics_.find(update.topic_name);
       if (topic_it == topics_.end()) {
@@ -498,80 +655,49 @@ Status Statestore::SendTopicUpdate(Subscriber* 
subscriber, bool* update_skipped)
         subscriber->SetLastTopicVersionProcessed(topic_it->first, 
update.from_version);
       }
 
-      Topic* topic = &topic_it->second;
-      for (const TTopicItem& item: update.topic_entries) {
-        subscriber->AddTransientUpdate(update.topic_name, item.key,
-            topic->Put(item.key, item.value, item.deleted));
+      Topic& topic = topic_it->second;
+      // Update the topic and add transient entries separately to avoid 
holding both
+      // locks at the same time and preventing concurrent topic updates.
+      vector<TopicEntry::Version> entry_versions = 
topic.Put(update.topic_entries);
+      if (!subscriber->AddTransientEntries(
+          update.topic_name, update.topic_entries, entry_versions)) {
+        // Subscriber was unregistered - clean up the transient entries.
+        for (int i = 0; i < update.topic_entries.size(); ++i) {
+          topic.DeleteIfVersionsMatch(entry_versions[i], 
update.topic_entries[i].key);
+        }
       }
     }
   }
-  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
+  update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 
1000.0));
   return Status::OK();
 }
 
 void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
-    TUpdateStateRequest* update_state_request) {
+    UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
   {
-    lock_guard<mutex> l(topic_lock_);
-    for (const Subscriber::Topics::value_type& subscribed_topic:
-         subscriber.subscribed_topics()) {
-      TopicMap::const_iterator topic_it = topics_.find(subscribed_topic.first);
+    DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+        || update_kind ==  UpdateKind::PRIORITY_TOPIC_UPDATE)
+        << static_cast<int>(update_kind);
+    const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
+    const Subscriber::Topics& subscribed_topics = is_priority
+        ? subscriber.priority_subscribed_topics()
+        : subscriber.non_priority_subscribed_topics();
+    shared_lock<shared_mutex> l(topics_map_lock_);
+    for (const auto& subscribed_topic: subscribed_topics) {
+      auto topic_it = topics_.find(subscribed_topic.first);
       DCHECK(topic_it != topics_.end());
-
       TopicEntry::Version last_processed_version =
           subscriber.LastTopicVersionProcessed(topic_it->first);
-      const Topic& topic = topic_it->second;
 
       TTopicDelta& topic_delta =
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
-
-      // If the subscriber version is > 0, send this update as a delta. 
Otherwise, this is
-      // a new subscriber so send them a non-delta update that includes all 
items in the
-      // topic.
-      topic_delta.is_delta = last_processed_version > 
Subscriber::TOPIC_INITIAL_VERSION;
-      topic_delta.__set_from_version(last_processed_version);
-
-      TopicUpdateLog::const_iterator next_update =
-          topic.topic_update_log().upper_bound(last_processed_version);
-
-      uint64_t topic_size = 0;
-      for (; next_update != topic.topic_update_log().end(); ++next_update) {
-        TopicEntryMap::const_iterator itr = 
topic.entries().find(next_update->second);
-        DCHECK(itr != topic.entries().end());
-        const TopicEntry& topic_entry = itr->second;
-        // Don't send deleted entries for non-delta updates.
-        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
-          continue;
-        }
-        topic_delta.topic_entries.push_back(TTopicItem());
-        TTopicItem& topic_item = topic_delta.topic_entries.back();
-        topic_item.key = itr->first;
-        topic_item.value = topic_entry.value();
-        topic_item.deleted = topic_entry.is_deleted();
-        topic_size += topic_item.key.size() + topic_item.value.size();
-      }
-
-      if (!topic_delta.is_delta &&
-          topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
-                   << " topic update for " << subscriber.id() << ". Size = "
-                   << PrettyPrinter::Print(topic_size, TUnit::BYTES);
-      }
-
-      if (topic.topic_update_log().size() > 0) {
-        // The largest version for this topic will be the last item in the 
version history
-        // map.
-        topic_delta.__set_to_version(topic.topic_update_log().rbegin()->first);
-      } else {
-        // There are no updates in the version history
-        topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
-      }
+      topic_it->second.BuildDelta(subscriber.id(), last_processed_version, 
&topic_delta);
     }
   }
 
   // Fill in the min subscriber topic version. This must be done after 
releasing
-  // topic_lock_.
+  // topics_map_lock_.
   lock_guard<mutex> l(subscribers_lock_);
   typedef map<TopicId, TTopicDelta> TopicDeltaMap;
   for (TopicDeltaMap::value_type& topic_delta: 
update_state_request->topic_deltas) {
@@ -586,8 +712,8 @@ Statestore::TopicEntry::Version 
Statestore::GetMinSubscriberTopicVersion(
   bool found = false;
   // Find the minimum version processed for this topic across all topic 
subscribers.
   for (const SubscriberMap::value_type& subscriber: subscribers_) {
-    if (subscriber.second->subscribed_topics().find(topic_id) !=
-        subscriber.second->subscribed_topics().end()) {
+    auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
+    if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
       found = true;
       TopicEntry::Version last_processed_version =
           subscriber.second->LastTopicVersionProcessed(topic_id);
@@ -600,15 +726,33 @@ Statestore::TopicEntry::Version 
Statestore::GetMinSubscriberTopicVersion(
   return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
 }
 
-bool Statestore::ShouldExit() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  return exit_flag_;
+bool Statestore::IsPrioritizedTopic(const string& topic) {
+  return topic == IMPALA_MEMBERSHIP_TOPIC || topic == 
IMPALA_REQUEST_QUEUE_TOPIC;
 }
 
-void Statestore::SetExitFlag() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  exit_flag_ = true;
-  subscriber_topic_update_threadpool_.Shutdown();
+const char* Statestore::GetUpdateKindName(UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return "topic update";
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return "priority topic update";
+    case UpdateKind::HEARTBEAT:
+      return "heartbeat";
+  }
+  DCHECK(false);
+}
+
+ThreadPool<Statestore::ScheduledSubscriberUpdate>* Statestore::GetThreadPool(
+    UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return &subscriber_topic_update_threadpool_;
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return &subscriber_priority_topic_update_threadpool_;
+    case UpdateKind::HEARTBEAT:
+      return &subscriber_heartbeat_threadpool_;
+  }
+  DCHECK(false);
 }
 
 Status Statestore::SendHeartbeat(Subscriber* subscriber) {
@@ -630,8 +774,9 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
   return Status::OK();
 }
 
-void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
     const ScheduledSubscriberUpdate& update) {
+  const bool is_heartbeat = update_kind == UpdateKind::HEARTBEAT;
   int64_t update_deadline = update.deadline;
   shared_ptr<Subscriber> subscriber;
   // Check if the subscriber has re-registered, in which case we can ignore
@@ -639,7 +784,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
   if (!FindSubscriber(update.subscriber_id, update.registration_id, 
&subscriber)) {
     return;
   }
-  const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
+  const char* update_kind_str = GetUpdateKindName(update_kind);
   if (update_deadline != 0) {
     // Wait until deadline.
     int64_t diff_ms = update_deadline - UnixMillis();
@@ -654,7 +799,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
       return;
     }
     diff_ms = std::abs(diff_ms);
-    VLOG(3) << "Sending " << hb_type << " message to: " << update.subscriber_id
+    VLOG(3) << "Sending " << update_kind_str << " message to: " << 
update.subscriber_id
         << " (deadline accuracy: " << diff_ms << "ms)";
 
     if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
@@ -663,7 +808,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
           "consider increasing --statestore_heartbeat_frequency_ms (currently 
$3) on "
           "this Statestore, and --statestore_subscriber_timeout_seconds "
           "on subscribers (Impala daemons and the Catalog Server)",
-          update.subscriber_id, hb_type, diff_ms,
+          update.subscriber_id, update_kind_str, diff_ms,
           FLAGS_statestore_heartbeat_frequency_ms);
       LOG(WARNING) << msg;
     }
@@ -674,7 +819,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
   } else {
     // The first update is scheduled immediately and has a deadline of 0. 
There's no need
     // to wait.
-    VLOG(3) << "Initial " << hb_type << " message for: " << 
update.subscriber_id;
+    VLOG(3) << "Initial " << update_kind_str << " message for: " << 
update.subscriber_id;
   }
 
   // Send the right message type, and compute the next deadline
@@ -692,18 +837,21 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, 
int thread_id,
     deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
   } else {
     bool update_skipped;
-    status = SendTopicUpdate(subscriber.get(), &update_skipped);
+    status = SendTopicUpdate(subscriber.get(), update_kind, &update_skipped);
     if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the 
stack
       status.AddDetail(Substitute(
           "Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
           subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
     }
-    // If the subscriber responded that it skipped the last update sent, we 
assume that
-    // it was busy doing something else, and back off slightly before sending 
another.
-    int64_t update_interval = update_skipped ?
-        (2 * FLAGS_statestore_update_frequency_ms) :
-        FLAGS_statestore_update_frequency_ms;
+    // If the subscriber responded that it skipped a topic in the last update 
sent,
+    // we assume that it was busy doing something else, and back off slightly 
before
+    // sending another.
+    int64_t update_frequency = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE
+        ? FLAGS_statestore_priority_update_frequency_ms
+        : FLAGS_statestore_update_frequency_ms;
+    int64_t update_interval = update_skipped ? (2 * update_frequency)
+                                                 : update_frequency;
     deadline_ms = UnixMillis() + update_interval;
   }
 
@@ -715,7 +863,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
     if (it == subscribers_.end() ||
         it->second->registration_id() != update.registration_id) return;
     if (!status.ok()) {
-      LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
+      LOG(INFO) << "Unable to send " << update_kind_str << " message to 
subscriber "
                 << update.subscriber_id << ", received error: " << 
status.GetDetail();
     }
 
@@ -736,10 +884,9 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int 
thread_id,
       VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " 
deadline for: "
               << subscriber->id() << " is in " << deadline_ms << "ms";
       status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, 
subscriber->id(),
-          subscriber->registration_id()), is_heartbeat ?
-          &subscriber_heartbeat_threadpool_ : 
&subscriber_topic_update_threadpool_);
+          subscriber->registration_id()), GetThreadPool(update_kind));
       if (!status.ok()) {
-        LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : 
"update")
+        LOG(INFO) << "Unable to send next " << update_kind_str
                   << " message to subscriber '" << subscriber->id() << "': "
                   << status.GetDetail();
       }
@@ -763,14 +910,11 @@ void Statestore::UnregisterSubscriber(Subscriber* 
subscriber) {
   failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
 
   // Delete all transient entries
-  lock_guard<mutex> topic_lock(topic_lock_);
-  for (Statestore::Subscriber::TransientEntryMap::value_type entry:
-       subscriber->transient_entries()) {
-    Statestore::TopicMap::iterator topic_it = topics_.find(entry.first.first);
-    DCHECK(topic_it != topics_.end());
-    topic_it->second.DeleteIfVersionsMatch(entry.second, // version
-        entry.first.second); // key
+  {
+    shared_lock<shared_mutex> topic_lock(topics_map_lock_);
+    subscriber->DeleteAllTransientEntries(&topics_);
   }
+
   num_subscribers_metric_->Increment(-1L);
   subscriber_set_metric_->Remove(subscriber->id());
   subscribers_.erase(subscriber->id());
@@ -778,4 +922,6 @@ void Statestore::UnregisterSubscriber(Subscriber* 
subscriber) {
 
 void Statestore::MainLoop() {
   subscriber_topic_update_threadpool_.Join();
+  subscriber_priority_topic_update_threadpool_.Join();
+  subscriber_heartbeat_threadpool_.Join();
 }

Reply via email to