http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index deeb5aa..3058e94 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -20,13 +20,16 @@
 
 #include <cstdint>
 #include <map>
+#include <memory>
 #include <string>
 #include <vector>
 
+#include <boost/thread/shared_mutex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "common/atomic.h"
 #include "common/status.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
@@ -51,38 +54,60 @@ typedef TUniqueId RegistrationId;
 
 /// The Statestore is a soft-state key-value store that maintains a set of 
Topics, which
 /// are maps from string keys to byte array values.
-//
+///
 /// Topics are subscribed to by subscribers, which are remote clients of the 
statestore
 /// which express an interest in some set of Topics. The statestore sends 
topic updates to
 /// subscribers via periodic 'update' messages, and also sends periodic 
'heartbeat'
-/// messages, which are used to detect the liveness of a subscriber.
-//
+/// messages, which are used to detect the liveness of a subscriber. Updates 
for each
+/// topic are delivered sequentially to each subscriber per subscription. E.g. 
if a
+/// subscriber is subscribed to a topic "foo", the statestore will not deliver 
topic
+/// updates for "foo" out-of-order or concurrently, but the updates may be sent
+/// concurrently or out-of-order with "bar".
+///
 /// In response to 'update' messages, subscribers, send topic updates to the 
statestore to
 /// merge with the current topic. These updates are then sent to all other 
subscribers in
-/// their next update message. The next message is scheduled for
-/// FLAGS_statestore_update_frequency_ms in the future, unless the subscriber 
indicated
-/// that it skipped processing an update, in which case the statestore will 
back off
-/// slightly before re-sending the same update.
-//
+/// their next update message. The next message is scheduled update_frequency 
in the
+/// future, unless the subscriber indicated that it skipped processing an 
update, in which
+/// case the statestore will back off slightly before re-sending a new update. 
The
+/// update frequency is determined by FLAGS_statestore_update_frequency_ms or
+/// FLAGS_statestore_priority_update_frequency, depending on whether the topic 
is a
+/// prioritized topic.
+///
+/// Prioritized topics are topics that are small but important to delivery in 
a timely
+/// manner. Handling those topics in a separate threadpool prevents large 
updates of other
+/// topics slowing or blocking dissemination of updates to prioritized topics.
+///
 /// Topic entries usually have human-readable keys, and values which are some 
serialised
 /// representation of a data structure, e.g. a Thrift struct. The contents of 
a value's
 /// byte string is opaque to the statestore, which maintains no information 
about how to
 /// deserialise it. Subscribers must use convention to interpret each other's 
updates.
-//
+///
 /// A subscriber may have marked some updates that it made as 'transient', 
which implies
 /// that those entries should be deleted once the subscriber is no longer 
connected (this
 /// is judged by the statestore's failure-detector, which will mark a 
subscriber as failed
 /// when it has not responded to a number of successive heartbeat messages). 
Transience
 /// is tracked per-topic-per-subscriber, so two different subscribers may 
treat the same
 /// topic differently wrt to the transience of their updates.
-//
+///
 /// The statestore tracks the history of updates to each topic, with each 
topic update
 /// getting a sequentially increasing version number that is unique across the 
topic.
-//
+///
 /// Subscribers also track the max version of each topic which they have have 
successfully
 /// processed. The statestore can use this information to send a delta of 
updates to a
 /// subscriber, rather than all items in the topic.  For non-delta updates, 
the statestore
 /// will send an update that includes all values in the topic.
+///
+/// +================+
+/// | Implementation |
+/// +================+
+///
+/// Locking:
+/// --------
+/// The lock acquisition order is:
+/// 1. 'subscribers_lock_'
+/// 2. 'topics_map_lock_'
+/// 3. Subscriber::transient_entry_lock_
+/// 4. Topic::lock_ (terminal)
 class Statestore : public CacheLineAligned {
  public:
   /// A SubscriberId uniquely identifies a single subscriber, and is
@@ -125,10 +150,14 @@ class Statestore : public CacheLineAligned {
     return thrift_iface_;
   }
 
-  /// Tells the Statestore to shut down. Does not wait for the processing loop 
to exit
-  /// before returning.
-  void SetExitFlag();
-
+  /// Names of prioritized topics that are handled in a separate threadpool. 
The topic
+  /// names are hardcoded here for expediency. Ideally we would have a more 
generic
+  /// interface for specifying prioritized topics, but for now we only have a 
small
+  /// fixed set of topics.
+  /// Topic tracking the set of live Impala daemon instances.
+  static const std::string IMPALA_MEMBERSHIP_TOPIC;
+  /// Topic tracking the state of admission control on all coordinators.
+  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, 
byte string>
   /// pair.
@@ -140,7 +169,7 @@ class Statestore : public CacheLineAligned {
     /// A version is a monotonically increasing counter. Each update to a 
topic has its own
     /// unique version with the guarantee that sequentially later updates have 
larger
     /// version numbers.
-    typedef uint64_t Version;
+    typedef int64_t Version;
 
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
@@ -199,14 +228,14 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), 
topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key and value (bytes). If is_deleted is
-    /// true the entry is considered deleted, and may be garbage collected in 
the future.
-    /// The entry is assigned a new version number by the Topic, and that 
version number
-    /// is returned.
-    //
-    /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& 
bytes,
-        bool is_deleted);
+    /// Add entries with the given keys and values. If is_deleted is true for 
an entry,
+    /// it is considered deleted, and may be garbage collected in the future. 
Each entry
+    /// is assigned a new version number by the Topic, and the version numbers 
are
+    /// returned.
+    ///
+    /// Safe to call concurrently from multiple threads (for different 
subscribers).
+    /// Acquires an exclusive write lock for the topic.
+    std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& 
entries);
 
     /// Utility method to support removing transient entries. We track the 
version numbers
     /// of entries added by subscribers, and remove entries with the same 
version number
@@ -215,26 +244,37 @@ class Statestore : public CacheLineAligned {
     //
     /// Deletion means marking the entry as deleted and incrementing its 
version
     /// number.
-    //
-    /// Must be called holding the topic lock
+    ///
+    /// Safe to call concurrently from multiple threads (for different 
subscribers).
+    /// Acquires an exclusive write lock for the topic.
     void DeleteIfVersionsMatch(TopicEntry::Version version, const 
TopicEntryKey& key);
 
-    const TopicId& id() const { return topic_id_; }
-    const TopicEntryMap& entries() const { return entries_; }
-    TopicEntry::Version last_version() const { return last_version_; }
-    const TopicUpdateLog& topic_update_log() const { return topic_update_log_; 
}
-    int64_t total_key_size_bytes() const { return total_key_size_bytes_; }
-    int64_t total_value_size_bytes() const { return total_value_size_bytes_; }
+    /// Build a delta update to send to 'subscriber_id' including the deltas 
greater
+    /// than 'last_processed_version' (not inclusive).
+    ///
+    /// Safe to call concurrently from multiple threads (for different 
subscribers).
+    /// Acquires a shared read lock for the topic.
+    void BuildDelta(const SubscriberId& subscriber_id,
+        TopicEntry::Version last_processed_version, TTopicDelta* delta);
 
+    /// Adds entries representing the current topic state to 'topic_json'.
+    void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
    private:
-    /// Map from topic entry key to topic entry.
-    TopicEntryMap entries_;
-
     /// Unique identifier for this topic. Should be human-readable.
     const TopicId topic_id_;
 
-    /// Tracks the last version that was assigned to an entry in this Topic. 
Incremented on
-    /// every Put() so each TopicEntry is tagged with a unique version value.
+    /// Reader-writer lock to protect state below. This is a terminal lock - no
+    /// other locks should be acquired while holding this one. 
boost::shared_mutex
+    /// gives writers priority over readers in acquiring the lock, which 
prevents
+    /// starvation.
+    boost::shared_mutex lock_;
+
+    /// Map from topic entry key to topic entry.
+    TopicEntryMap entries_;
+
+    /// Tracks the last version that was assigned to an entry in this Topic. 
Incremented
+    /// every time an entry is added in Put() so each TopicEntry is tagged 
with a unique
+    /// version value.
     TopicEntry::Version last_version_;
 
     /// Contains a history of updates to this Topic, with each key being a 
Version and the
@@ -259,18 +299,12 @@ class Statestore : public CacheLineAligned {
     IntGauge* topic_size_metric_;
   };
 
-  /// Note on locking: Subscribers and Topics should be accessed under their 
own coarse
-  /// locks, and worker threads will use worker_lock_ to ensure safe access to 
the
-  /// subscriber work queue.
-
-  /// Protects access to exit_flag_, but is used mostly to ensure visibility 
of updates
-  /// between threads..
-  boost::mutex exit_flag_lock_;
-
-  bool exit_flag_;
-
-  /// Controls access to topics_. Cannot take subscribers_lock_ after 
acquiring this lock.
-  boost::mutex topic_lock_;
+  /// Protects the 'topics_' map. Should be held shared when reading or 
holding a
+  /// reference to entries in the map and exclusively when modifying the map.
+  /// See the class comment for the lock acquisition order. boost::shared_mutex
+  /// gives writers priority over readers in acquiring the lock, which prevents
+  /// starvation.
+  boost::shared_mutex topics_map_lock_;
 
   /// The entire set of topics tracked by the statestore
   typedef boost::unordered_map<TopicId, Topic> TopicMap;
@@ -287,40 +321,68 @@ class Statestore : public CacheLineAligned {
         const TNetworkAddress& network_address,
         const std::vector<TTopicRegistration>& subscribed_topics);
 
-    /// The TopicState contains information on whether entries written by this 
subscriber
-    /// should be considered transient, as well as the last topic entry version
-    /// successfully processed by this subscriber.
-    struct TopicState {
-      bool is_transient;
-      TopicEntry::Version last_version;
+    /// Information about a subscriber's subscription to a specific topic.
+    struct TopicSubscription {
+      TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+
+      /// Whether entries written by this subscriber should be considered 
transient.
+      const bool is_transient;
+
+      /// The last topic entry version successfully processed by this 
subscriber. Only
+      /// written by a single thread at a time but can be read concurrently.
+      AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
+
+      /// Map from the key to the version of a transient update made by this 
subscriber.
+      /// protected by Subscriber:: 'transient_entries_lock_'.
+      boost::unordered_map<TopicEntryKey, TopicEntry::Version> 
transient_entries_;
     };
 
     /// The set of topics subscribed to, and current state (as seen by this 
subscriber) of
     /// the topic.
-    typedef boost::unordered_map<TopicId, TopicState> Topics;
+    typedef boost::unordered_map<TopicId, TopicSubscription> Topics;
 
     /// The Version value used to initialize new Topic subscriptions for this 
Subscriber.
     static const TopicEntry::Version TOPIC_INITIAL_VERSION;
 
-    const Topics& subscribed_topics() const { return subscribed_topics_; }
+    const Topics& non_priority_subscribed_topics() const {
+      return non_priority_subscribed_topics_;
+    }
+    const Topics& priority_subscribed_topics() const { return 
priority_subscribed_topics_; }
     const TNetworkAddress& network_address() const { return network_address_; }
     const SubscriberId& id() const { return subscriber_id_; }
     const RegistrationId& registration_id() const { return registration_id_; }
 
-    /// Records the fact that an update to this topic is owned by this 
subscriber.  The
-    /// version number of the update is saved so that only those updates which 
are made
-    /// most recently by this subscriber - and not overwritten by another 
subscriber - are
-    /// deleted on failure. If the topic the entry belongs to is not marked as 
transient,
-    /// no update will be recorded.
-    void AddTransientUpdate(const TopicId& topic_id, const TopicEntryKey& 
topic_key,
-        TopicEntry::Version version);
-
-    /// Map from the topic / key pair to the version of a transient update 
made by this
-    /// subscriber.
-    typedef boost::unordered_map<std::pair<TopicId, TopicEntryKey>, 
TopicEntry::Version>
-        TransientEntryMap;
-
-    const TransientEntryMap& transient_entries() const { return 
transient_entries_; }
+    /// Get the Topics map that would be used to store 'topic_id'.
+    const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
+      return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
+                                          : non_priority_subscribed_topics_;
+    }
+    Topics* GetTopicsMapForId(const TopicId& topic_id) {
+      return IsPrioritizedTopic(topic_id) ? &priority_subscribed_topics_
+                                          : &non_priority_subscribed_topics_;
+    }
+
+    /// Records the fact that updates to this topic are owned by this 
subscriber.  The
+    /// version number of each update (which must be at the corresponding 
index in
+    /// 'versions' is saved so that only those updates which are made most 
recently by
+    /// this subscriber - and not overwritten by another subscriber - are 
deleted on
+    /// failure. If the topic each entry belongs to is not marked as 
transient, no update
+    /// will be recorded. Should not be called concurrently from multiple 
threads for a
+    /// given 'topic_id'.
+    ///
+    /// Returns false if DeleteAllTransientEntries() was called and 'topic_id' 
entries
+    /// are transient, in which case the caller should delete the entries 
themselves.
+    bool AddTransientEntries(const TopicId& topic_id,
+        const std::vector<TTopicItem>& entries,
+        const std::vector<TopicEntry::Version>& entry_versions) 
WARN_UNUSED_RESULT;
+
+    /// Delete all transient topic entries for this subscriber from 
'global_topics'.
+    ///
+    /// Statestore::topics_map_lock_ (in shared mode) must be held by the 
caller.
+    void DeleteAllTransientEntries(TopicMap* global_topics);
+
+    /// Returns the number of transient entries.
+    int64_t NumTransientEntries();
 
     /// Returns the last version of the topic which this subscriber has 
successfully
     /// processed. Will never decrease.
@@ -328,7 +390,8 @@ class Statestore : public CacheLineAligned {
 
     /// Sets the subscriber's last processed version of the topic to the given 
value.  This
     /// should only be set when once a subscriber has succesfully processed 
the given
-    /// update corresponding to this version.
+    /// update corresponding to this version. Should not be called 
concurrently from
+    /// multiple threads for a given 'topic_id'.
     void SetLastTopicVersionProcessed(const TopicId& topic_id,
         TopicEntry::Version version);
 
@@ -346,19 +409,25 @@ class Statestore : public CacheLineAligned {
     /// The location of the subscriber service that this subscriber runs.
     const TNetworkAddress network_address_;
 
-    /// Map of topic subscriptions to current TopicState. The the state 
describes whether
-    /// updates on the topic are 'transient' (i.e., to be deleted upon 
subscriber failure)
-    /// or not and contains the version number of the last update processed by 
this
-    /// Subscriber on the topic.
-    Topics subscribed_topics_;
-
-    /// List of updates made by this subscriber so that transient entries may 
be deleted on
-    /// failure.
-    TransientEntryMap transient_entries_;
+    /// Maps of topic subscriptions to current TopicSubscription, with 
separate maps for
+    /// priority and non-priority topics. The state describes whether updates 
on the
+    /// topic are 'transient' (i.e., to be deleted upon subscriber failure) or 
not
+    /// and contains the version number of the last update processed by this 
Subscriber
+    /// on the topic. The set of keys is not modified after construction.
+    Topics priority_subscribed_topics_;
+    Topics non_priority_subscribed_topics_;
+
+    /// Lock held when adding or deleting transient entries. See class comment 
for lock
+    /// acquisition order.
+    boost::mutex transient_entry_lock_;
+
+    /// True once DeleteAllTransientEntries() has been called during subscriber
+    /// unregisteration. Protected by 'transient_entry_lock_'
+    bool unregistered_ = false;
   };
 
-  /// Protects access to subscribers_ and subscriber_uuid_generator_. Must be 
taken before
-  /// topic_lock_.
+  /// Protects access to subscribers_ and subscriber_uuid_generator_. See the 
class
+  /// comment for the lock acquisition order.
   boost::mutex subscribers_lock_;
 
   /// Map of subscribers currently connected; upon failure their entry is 
removed from this
@@ -393,11 +462,12 @@ class Statestore : public CacheLineAligned {
         registration_id(r_id) {}
   };
 
-  /// The statestore has two pools of threads that send messages to subscribers
+  /// The statestore has three pools of threads that send messages to 
subscribers
   /// one-by-one. One pool deals with 'heartbeat' messages that update failure 
detection
-  /// state, and the other pool sends 'topic update' messages which contain the
-  /// actual topic data that a subscriber does not yet have.
-  //
+  /// state, and the remaining pools send 'topic update' messages that contain 
the
+  /// actual topic data that a subscriber does not yet have, with one pool 
dedicated to
+  /// a set of special "prioritized" topics.
+  ///
   /// Each message is scheduled for some time in the future and each worker 
thread
   /// will sleep until that time has passed to rate-limit messages. 
Subscribers are
   /// placed back into the queue once they have been processed. A subscriber 
may have many
@@ -405,24 +475,31 @@ class Statestore : public CacheLineAligned {
   /// subscriber. Since at most one registration is considered 'live' per 
subscriber, this
   /// guarantees that subscribers_.size() - 1 'live' subscribers ahead of any 
subscriber in
   /// the queue.
-  //
+  ///
   /// Messages may be delayed for any number of reasons, including scheduler
   /// interference, lock unfairness when submitting to the thread pool and 
head-of-line
   /// blocking when threads are occupied sending messages to slow subscribers
   /// (subscribers are not guaranteed to be in the queue in next-update order).
-  //
+  ///
   /// Delays for heartbeat messages can result in the subscriber that is kept 
waiting
   /// assuming that the statestore has failed. Correct configuration of 
heartbeat message
   /// frequency and subscriber timeout is therefore very important, and 
depends upon the
   /// cluster size. See --statestore_heartbeat_frequency_ms and
   /// --statestore_subscriber_timeout_seconds. We expect that the provided 
defaults will
   /// work up to clusters of several hundred nodes.
-  //
+  ///
   /// Subscribers are therefore not processed in lock-step, and one subscriber 
may have
   /// seen many more messages than another during the same interval (if the 
second
   /// subscriber runs slow for any reason).
+  enum class UpdateKind {
+    TOPIC_UPDATE,
+    PRIORITY_TOPIC_UPDATE,
+    HEARTBEAT
+  };
   ThreadPool<ScheduledSubscriberUpdate> subscriber_topic_update_threadpool_;
 
+  ThreadPool<ScheduledSubscriberUpdate> 
subscriber_priority_topic_update_threadpool_;
+
   ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
 
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client 
per
@@ -452,10 +529,11 @@ class Statestore : public CacheLineAligned {
   IntGauge* value_size_metric_;
   IntGauge* topic_size_metric_;
 
-  /// Tracks the distribution of topic-update durations - precisely the time 
spent in
-  /// calling the UpdateState() RPC which allows us to measure the network 
transmission
-  /// cost as well as the subscriber-side processing time.
+  /// Tracks the distribution of topic-update durations for regular and 
prioritized topic
+  /// updates. This measures the time spent in calling the UpdateState() RPC 
which
+  /// includes network transmission cost and subscriber-side processing time.
   StatsMetric<double>* topic_update_duration_metric_;
+  StatsMetric<double>* priority_topic_update_duration_metric_;
 
   /// Same as above, but for SendHeartbeat() RPCs.
   StatsMetric<double>* heartbeat_duration_metric_;
@@ -466,23 +544,25 @@ class Statestore : public CacheLineAligned {
       ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
 
   /// Sends either a heartbeat or topic update message to the subscriber in 
'update' at
-  /// the closest possible time to the first member of 'update'. If 
is_heartbeat is true,
-  /// sends a heartbeat update, otherwise the set of pending topic updates is 
sent. Once
-  /// complete, the next update is scheduled and added to the appropriate 
queue.
-  void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+  /// the closest possible time to the first member of 'update'. If 
'update_kind' is
+  /// HEARTBEAT, sends a heartbeat update, otherwise the set of 
priority/non-priority
+  /// pending topic updates is sent. Once complete, the next update is 
scheduled and
+  /// added to the appropriate queue.
+  void DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
       const ScheduledSubscriberUpdate& update);
 
   /// Does the work of updating a single subscriber, by calling UpdateState() 
on the client
   /// to send a list of topic deltas to the subscriber. If that call fails 
(either because
   /// the RPC could not be completed, or the subscriber indicated an error), 
this method
   /// returns a non-OK status immediately without further processing.
-  //
+  ///
   /// The subscriber may indicated that it skipped processing the message, 
either because
   /// it was not ready to do so or because it was busy. In that case, the 
UpdateState() RPC
   /// will return OK (since there was no error) and the output parameter 
update_skipped is
   /// set to true. Otherwise, any updates returned by the subscriber are 
applied to their
   /// target topics.
-  Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) 
WARN_UNUSED_RESULT;
+  Status SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
+      bool* update_skipped) WARN_UNUSED_RESULT;
 
   /// Sends a heartbeat message to subscriber. Returns false if there was some 
error
   /// performing the RPC.
@@ -501,9 +581,10 @@ class Statestore : public CacheLineAligned {
   void UnregisterSubscriber(Subscriber* subscriber);
 
   /// Populates a TUpdateStateRequest with the update state for this 
subscriber. Iterates
-  /// over all updates in all subscribed topics, populating the given 
TUpdateStateRequest
-  /// object. Takes the topic_lock_ and subscribers_lock_.
-  void GatherTopicUpdates(const Subscriber& subscriber,
+  /// over all updates in all priority or non-priority subscribed topics, 
based on
+  /// 'update_kind'. The given TUpdateStateRequest object is populated with the
+  /// changes to the subscribed topics. Takes the topics_map_lock_ and 
subscribers_lock_.
+  void GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
       TUpdateStateRequest* update_state_request);
 
   /// Returns the minimum last processed topic version across all subscribers 
for the given
@@ -523,8 +604,14 @@ class Statestore : public CacheLineAligned {
   TopicEntry::Version GetMinSubscriberTopicVersion(
       const TopicId& topic_id, SubscriberId* subscriber_id = NULL);
 
-  /// True if the shutdown flag has been set true, false otherwise.
-  bool ShouldExit();
+  /// Returns true if this topic should be handled by the priority pool.
+  static bool IsPrioritizedTopic(const std::string& topic);
+
+  /// Return human-readable name for 'kind'.
+  static const char* GetUpdateKindName(UpdateKind kind);
+
+  /// Return the thread pool to process updates of 'kind'.
+  ThreadPool<ScheduledSubscriberUpdate>* GetThreadPool(UpdateKind kind);
 
   /// Webpage handler: upon return, 'document' will contain a list of topics 
as follows:
   /// "topics": [

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index df78a0b..b457741 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -972,7 +972,18 @@
     "key": "statestore-subscriber.topic-$0.processing-time-s"
   },
   {
-    "description": "The time (sec) taken to process Statestore subcriber topic 
updates.",
+    "description": "Interval between topic updates for Topic $0",
+    "contexts": [
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Statestore Subscriber Topic $0 Update Interval",
+    "units": "TIME_S",
+    "kind": "STATS",
+    "key": "statestore-subscriber.topic-$0.update-interval"
+  },
+  {
+    "description": "The time (sec) taken to process Statestore subscriber 
topic updates.",
     "contexts": [
       "CATALOGSERVER",
       "IMPALAD"
@@ -1024,7 +1035,7 @@
     "key": "statestore.live-backends.list"
   },
   {
-    "description": "The time (sec) spent sending topic update RPCs. Includes 
subscriber-side processing time and network transmission time.",
+    "description": "The time (sec) spent sending non-priority topic update 
RPCs. Includes subscriber-side processing time and network transmission time.",
     "contexts": [
       "STATESTORE"
     ],
@@ -1034,6 +1045,16 @@
     "key": "statestore.topic-update-durations"
   },
   {
+    "description": "The time (sec) spent sending priority topic update RPCs. 
Includes subscriber-side processing time and network transmission time.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Statestore Priority Topic Update Durations",
+    "units": "TIME_S",
+    "kind": "STATS",
+    "key": "statestore.priority-topic-update-durations"
+  },
+  {
     "description": "The sum of the size of all keys for all topics tracked by 
the StateStore.",
     "contexts": [
       "STATESTORE"

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 69cabd8..bed6994 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -120,9 +120,13 @@ MAX_NUM_QUEUED_QUERIES = 10
 # Mem limit (bytes) used in the mem limit test
 MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
 
-_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
-                    "-statestore_update_frequency_ms=%s" %\
-                    (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS)
+_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
+                     
"-statestore_priority_update_frequency_ms={freq_ms}").format(
+                    freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+
+# Name of the subscriber metric tracking the admission control update interval.
+REQUEST_QUEUE_UPDATE_INTERVAL =\
+    'statestore-subscriber.topic-impala-request-queue.update-interval'
 
 # Key in the query profile for the query options.
 PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
@@ -551,14 +555,14 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
       sleep(1)
 
   def wait_for_statestore_updates(self, heartbeats):
-    """Waits for a number of statestore heartbeats from all impalads."""
+    """Waits for a number of admission control statestore updates from all 
impalads."""
     start_time = time()
     num_impalads = len(self.impalads)
     init = dict()
     curr = dict()
     for impalad in self.impalads:
-      init[impalad] = impalad.service.get_metric_value(\
-          'statestore-subscriber.topic-update-interval-time')['count']
+      init[impalad] = impalad.service.get_metric_value(
+          REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       curr[impalad] = init[impalad]
 
     while True:
@@ -566,8 +570,8 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           init.values(), [curr[i] - init[i] for i in self.impalads])
       if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break
       for impalad in self.impalads:
-        curr[impalad] = impalad.service.get_metric_value(\
-            'statestore-subscriber.topic-update-interval-time')['count']
+        curr[impalad] = impalad.service.get_metric_value(
+            REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
       sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index 1003dc7..ba51c8d 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -15,9 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from collections import defaultdict
 import json
 import socket
 import threading
+import traceback
 import time
 import urllib2
 import uuid
@@ -167,7 +169,10 @@ class StatestoreSubscriber(object):
   more readable."""
   def __init__(self, heartbeat_cb=None, update_cb=None):
     self.heartbeat_event, self.heartbeat_count = threading.Condition(), 0
-    self.update_event, self.update_count = threading.Condition(), 0
+    # Track the number of updates received per topic.
+    self.update_counts = defaultdict(lambda : 0)
+    # Variables to notify for updates on each topic.
+    self.update_event = threading.Condition()
     self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
     self.exception = None
 
@@ -191,12 +196,14 @@ class StatestoreSubscriber(object):
     """UpdateState RPC handler. Calls update callback if one exists."""
     self.update_event.acquire()
     try:
-      self.update_count += 1
+      for topic_name in args.topic_deltas: self.update_counts[topic_name] += 1
       response = DEFAULT_UPDATE_STATE_RESPONSE
       if self.update_cb is not None and self.exception is None:
         try:
           response = self.update_cb(self, args)
         except Exception, e:
+          # Print the original backtrace so it doesn't get lost.
+          traceback.print_exc()
           self.exception = e
       self.update_event.notify()
     finally:
@@ -278,21 +285,23 @@ class StatestoreSubscriber(object):
     finally:
       self.heartbeat_event.release()
 
-  def wait_for_update(self, count=None):
-    """Waits for some number of updates. If 'count' is provided, waits until 
the number
-    of updates seen by this subscriber exceeds count, otherwise waits for one 
further
-    update."""
+  def wait_for_update(self, topic_name, count=None):
+    """Waits for some number of updates of 'topic_name'. If 'count' is 
provided, waits
+    until the number updates seen by this subscriber exceeds count, otherwise 
waits
+    for one further update."""
     self.update_event.acquire()
+    start_time = time.time()
     try:
-      if count is not None and self.update_count >= count: return self
-      if count is None: count = self.update_count + 1
-      while count > self.update_count:
+      if count is not None and self.update_counts[topic_name] >= count: return 
self
+      if count is None: count = self.update_counts[topic_name] + 1
+      while count > self.update_counts[topic_name]:
         self.check_thread_exceptions()
-        last_count = self.update_count
+        last_count = self.update_counts[topic_name]
         self.update_event.wait(10)
-        if last_count == self.update_count:
-          raise Exception("Update not received within 10s (update count: %s)" %
-                          self.update_count)
+        if (time.time() > start_time + 10 and
+            last_count == self.update_counts[topic_name]):
+          raise Exception("Update not received for %s within 10s (update 
count: %s)" %
+                          (topic_name, last_count))
       self.check_thread_exceptions()
       return self
     finally:
@@ -340,14 +349,18 @@ class TestStatestore():
 
     def topic_update_correct(sub, args):
       delta = self.make_topic_update(topic_name)
-      if sub.update_count == 1:
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
                                     skipped=False)
-      elif sub.update_count == 2:
-        assert len(args.topic_deltas) == 1
+      elif update_count == 2:
+        assert len(args.topic_deltas) == 1, args.topic_deltas
         assert args.topic_deltas[topic_name].topic_entries == 
delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
-      elif sub.update_count == 3:
+      elif update_count == 3:
         # After the content-bearing update was processed, the next delta 
should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
 
@@ -358,7 +371,7 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_update_is_delta(self):
@@ -368,14 +381,18 @@ class TestStatestore():
     topic_name = "test_update_is_delta_%s" % uuid.uuid1()
 
     def check_delta(sub, args):
-      if sub.update_count == 1:
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
         assert args.topic_deltas[topic_name].is_delta == False
         delta = self.make_topic_update(topic_name)
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
                                     skipped=False)
-      elif sub.update_count == 2:
+      elif update_count == 2:
         assert args.topic_deltas[topic_name].is_delta == False
-      elif sub.update_count == 3:
+      elif update_count == 3:
         assert args.topic_deltas[topic_name].is_delta == True
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
         assert args.topic_deltas[topic_name].to_version == 1
@@ -387,7 +404,7 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_skipped(self):
@@ -395,7 +412,10 @@ class TestStatestore():
     topic_name = "test_skipped_%s" % uuid.uuid1()
 
     def check_skipped(sub, args):
-      if sub.update_count == 1:
+      # Ignore responses that don't contain our topic.
+      if topic_name not in args.topic_deltas: return 
DEFAULT_UPDATE_STATE_RESPONSE
+      update_count = sub.update_counts[topic_name]
+      if update_count == 1:
         update = self.make_topic_update(topic_name)
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[update],
                                     skipped=False)
@@ -410,15 +430,17 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_failure_detected(self):
     sub = StatestoreSubscriber()
+    topic_name = "test_failure_detected"
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
     (
       sub.start()
-         .register()
-         .wait_for_update(1)
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 1)
          .kill()
          .wait_for_failure()
     )
@@ -428,10 +450,12 @@ class TestStatestore():
     minutes) the statestore should time them out every 3s and then eventually 
fail after
     40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
     sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300))
+    topic_name = "test_hung_heartbeat"
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
     (
       sub.start()
-         .register()
-         .wait_for_update(1)
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 1)
          .wait_for_failure(timeout=60)
     )
 
@@ -443,21 +467,32 @@ class TestStatestore():
     transient_topic_name = "test_topic_persistence_transient_%s" % topic_id
 
     def add_entries(sub, args):
-      if sub.update_count == 1:
-        updates = [self.make_topic_update(persistent_topic_name),
-                   self.make_topic_update(transient_topic_name)]
+      # None of, one or both of the topics may be in the update.
+      updates = []
+      if (persistent_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
+        updates.append(self.make_topic_update(persistent_topic_name))
+
+      if (transient_topic_name in args.topic_deltas and
+          sub.update_counts[transient_topic_name] == 1):
+        updates.append(self.make_topic_update(transient_topic_name))
+
+      if len(updates) > 0:
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
                                     skipped=False)
-
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     def check_entries(sub, args):
-      if sub.update_count == 1:
-        assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
+      # None of, one or both of the topics may be in the update.
+      if (persistent_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, 
see
         # IMPALA-1891
         assert 
args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
+      if (transient_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
+        assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, 
is_transient=False),
@@ -467,7 +502,8 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=reg)
-         .wait_for_update(2)
+         .wait_for_update(persistent_topic_name, 2)
+         .wait_for_update(transient_topic_name, 2)
          .kill()
          .wait_for_failure()
     )
@@ -476,5 +512,6 @@ class TestStatestore():
     (
       sub2.start()
           .register(topics=reg)
-          .wait_for_update(1)
+          .wait_for_update(persistent_topic_name, 1)
+          .wait_for_update(transient_topic_name, 1)
     )

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f117002..f57b4f6 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -25,6 +25,7 @@ under the License.
   <th>Id</th>
   <th>Address</th>
   <th>Subscribed topics</th>
+  <th>Subscribed priority topics</th>
   <th>Transient entries</th>
   <th>Registration ID</th>
 </tr>
@@ -34,6 +35,7 @@ under the License.
   <td>{{id}}</td>
   <td>{{address}}</td>
   <td>{{num_topics}}</td>
+  <td>{{num_priority_topics}}</td>
   <td>{{num_transient}}</td>
   <td>{{registration_id}}</td>
 </tr>

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_topics.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_topics.tmpl b/www/statestore_topics.tmpl
index 99f0dfe..56568f7 100644
--- a/www/statestore_topics.tmpl
+++ b/www/statestore_topics.tmpl
@@ -25,6 +25,7 @@ under the License.
   <th>Topic Id</th>
   <th>Number of entries</th>
   <th>Version</th>
+  <th>Prioritized</th>
   <th>Oldest subscriber version</th>
   <th>Oldest subscriber ID</th>
   <th>Size (keys / values / total)</th>
@@ -35,6 +36,7 @@ under the License.
   <td>{{topic_id}}</td>
   <td>{{num_entries}}</td>
   <td>{{version}}</td>
+  <td>{{prioritized}}</td>
   <td>{{oldest_version}}</td>
   <td>{{oldest_id}}</td>
   <td>{{key_size}} / {{value_size}} / {{total_size}}</td>

Reply via email to