Repository: impala Updated Branches: refs/heads/2.x 09962ad9d -> 26309141d
http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index deeb5aa..3058e94 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -20,13 +20,16 @@ #include <cstdint> #include <map> +#include <memory> #include <string> #include <vector> +#include <boost/thread/shared_mutex.hpp> #include <boost/scoped_ptr.hpp> #include <boost/unordered_map.hpp> #include <boost/uuid/uuid_generators.hpp> +#include "common/atomic.h" #include "common/status.h" #include "gen-cpp/StatestoreService.h" #include "gen-cpp/StatestoreSubscriber.h" @@ -51,38 +54,60 @@ typedef TUniqueId RegistrationId; /// The Statestore is a soft-state key-value store that maintains a set of Topics, which /// are maps from string keys to byte array values. -// +/// /// Topics are subscribed to by subscribers, which are remote clients of the statestore /// which express an interest in some set of Topics. The statestore sends topic updates to /// subscribers via periodic 'update' messages, and also sends periodic 'heartbeat' -/// messages, which are used to detect the liveness of a subscriber. -// +/// messages, which are used to detect the liveness of a subscriber. Updates for each +/// topic are delivered sequentially to each subscriber per subscription. E.g. if a +/// subscriber is subscribed to a topic "foo", the statestore will not deliver topic +/// updates for "foo" out-of-order or concurrently, but the updates may be sent +/// concurrently or out-of-order with "bar". +/// /// In response to 'update' messages, subscribers, send topic updates to the statestore to /// merge with the current topic. These updates are then sent to all other subscribers in -/// their next update message. The next message is scheduled for -/// FLAGS_statestore_update_frequency_ms in the future, unless the subscriber indicated -/// that it skipped processing an update, in which case the statestore will back off -/// slightly before re-sending the same update. -// +/// their next update message. The next message is scheduled update_frequency in the +/// future, unless the subscriber indicated that it skipped processing an update, in which +/// case the statestore will back off slightly before re-sending a new update. The +/// update frequency is determined by FLAGS_statestore_update_frequency_ms or +/// FLAGS_statestore_priority_update_frequency, depending on whether the topic is a +/// prioritized topic. +/// +/// Prioritized topics are topics that are small but important to delivery in a timely +/// manner. Handling those topics in a separate threadpool prevents large updates of other +/// topics slowing or blocking dissemination of updates to prioritized topics. +/// /// Topic entries usually have human-readable keys, and values which are some serialised /// representation of a data structure, e.g. a Thrift struct. The contents of a value's /// byte string is opaque to the statestore, which maintains no information about how to /// deserialise it. Subscribers must use convention to interpret each other's updates. -// +/// /// A subscriber may have marked some updates that it made as 'transient', which implies /// that those entries should be deleted once the subscriber is no longer connected (this /// is judged by the statestore's failure-detector, which will mark a subscriber as failed /// when it has not responded to a number of successive heartbeat messages). Transience /// is tracked per-topic-per-subscriber, so two different subscribers may treat the same /// topic differently wrt to the transience of their updates. -// +/// /// The statestore tracks the history of updates to each topic, with each topic update /// getting a sequentially increasing version number that is unique across the topic. -// +/// /// Subscribers also track the max version of each topic which they have have successfully /// processed. The statestore can use this information to send a delta of updates to a /// subscriber, rather than all items in the topic. For non-delta updates, the statestore /// will send an update that includes all values in the topic. +/// +/// +================+ +/// | Implementation | +/// +================+ +/// +/// Locking: +/// -------- +/// The lock acquisition order is: +/// 1. 'subscribers_lock_' +/// 2. 'topics_map_lock_' +/// 3. Subscriber::transient_entry_lock_ +/// 4. Topic::lock_ (terminal) class Statestore : public CacheLineAligned { public: /// A SubscriberId uniquely identifies a single subscriber, and is @@ -125,10 +150,14 @@ class Statestore : public CacheLineAligned { return thrift_iface_; } - /// Tells the Statestore to shut down. Does not wait for the processing loop to exit - /// before returning. - void SetExitFlag(); - + /// Names of prioritized topics that are handled in a separate threadpool. The topic + /// names are hardcoded here for expediency. Ideally we would have a more generic + /// interface for specifying prioritized topics, but for now we only have a small + /// fixed set of topics. + /// Topic tracking the set of live Impala daemon instances. + static const std::string IMPALA_MEMBERSHIP_TOPIC; + /// Topic tracking the state of admission control on all coordinators. + static const std::string IMPALA_REQUEST_QUEUE_TOPIC; private: /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string> /// pair. @@ -140,7 +169,7 @@ class Statestore : public CacheLineAligned { /// A version is a monotonically increasing counter. Each update to a topic has its own /// unique version with the guarantee that sequentially later updates have larger /// version numbers. - typedef uint64_t Version; + typedef int64_t Version; /// The Version value used to initialize a new TopicEntry. static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L; @@ -199,14 +228,14 @@ class Statestore : public CacheLineAligned { total_value_size_bytes_(0L), key_size_metric_(key_size_metric), value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { } - /// Adds an entry with the given key and value (bytes). If is_deleted is - /// true the entry is considered deleted, and may be garbage collected in the future. - /// The entry is assigned a new version number by the Topic, and that version number - /// is returned. - // - /// Must be called holding the topic lock - TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes, - bool is_deleted); + /// Add entries with the given keys and values. If is_deleted is true for an entry, + /// it is considered deleted, and may be garbage collected in the future. Each entry + /// is assigned a new version number by the Topic, and the version numbers are + /// returned. + /// + /// Safe to call concurrently from multiple threads (for different subscribers). + /// Acquires an exclusive write lock for the topic. + std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& entries); /// Utility method to support removing transient entries. We track the version numbers /// of entries added by subscribers, and remove entries with the same version number @@ -215,26 +244,37 @@ class Statestore : public CacheLineAligned { // /// Deletion means marking the entry as deleted and incrementing its version /// number. - // - /// Must be called holding the topic lock + /// + /// Safe to call concurrently from multiple threads (for different subscribers). + /// Acquires an exclusive write lock for the topic. void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key); - const TopicId& id() const { return topic_id_; } - const TopicEntryMap& entries() const { return entries_; } - TopicEntry::Version last_version() const { return last_version_; } - const TopicUpdateLog& topic_update_log() const { return topic_update_log_; } - int64_t total_key_size_bytes() const { return total_key_size_bytes_; } - int64_t total_value_size_bytes() const { return total_value_size_bytes_; } + /// Build a delta update to send to 'subscriber_id' including the deltas greater + /// than 'last_processed_version' (not inclusive). + /// + /// Safe to call concurrently from multiple threads (for different subscribers). + /// Acquires a shared read lock for the topic. + void BuildDelta(const SubscriberId& subscriber_id, + TopicEntry::Version last_processed_version, TTopicDelta* delta); + /// Adds entries representing the current topic state to 'topic_json'. + void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json); private: - /// Map from topic entry key to topic entry. - TopicEntryMap entries_; - /// Unique identifier for this topic. Should be human-readable. const TopicId topic_id_; - /// Tracks the last version that was assigned to an entry in this Topic. Incremented on - /// every Put() so each TopicEntry is tagged with a unique version value. + /// Reader-writer lock to protect state below. This is a terminal lock - no + /// other locks should be acquired while holding this one. boost::shared_mutex + /// gives writers priority over readers in acquiring the lock, which prevents + /// starvation. + boost::shared_mutex lock_; + + /// Map from topic entry key to topic entry. + TopicEntryMap entries_; + + /// Tracks the last version that was assigned to an entry in this Topic. Incremented + /// every time an entry is added in Put() so each TopicEntry is tagged with a unique + /// version value. TopicEntry::Version last_version_; /// Contains a history of updates to this Topic, with each key being a Version and the @@ -259,18 +299,12 @@ class Statestore : public CacheLineAligned { IntGauge* topic_size_metric_; }; - /// Note on locking: Subscribers and Topics should be accessed under their own coarse - /// locks, and worker threads will use worker_lock_ to ensure safe access to the - /// subscriber work queue. - - /// Protects access to exit_flag_, but is used mostly to ensure visibility of updates - /// between threads.. - boost::mutex exit_flag_lock_; - - bool exit_flag_; - - /// Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock. - boost::mutex topic_lock_; + /// Protects the 'topics_' map. Should be held shared when reading or holding a + /// reference to entries in the map and exclusively when modifying the map. + /// See the class comment for the lock acquisition order. boost::shared_mutex + /// gives writers priority over readers in acquiring the lock, which prevents + /// starvation. + boost::shared_mutex topics_map_lock_; /// The entire set of topics tracked by the statestore typedef boost::unordered_map<TopicId, Topic> TopicMap; @@ -287,40 +321,68 @@ class Statestore : public CacheLineAligned { const TNetworkAddress& network_address, const std::vector<TTopicRegistration>& subscribed_topics); - /// The TopicState contains information on whether entries written by this subscriber - /// should be considered transient, as well as the last topic entry version - /// successfully processed by this subscriber. - struct TopicState { - bool is_transient; - TopicEntry::Version last_version; + /// Information about a subscriber's subscription to a specific topic. + struct TopicSubscription { + TopicSubscription(bool is_transient) : is_transient(is_transient) {} + + /// Whether entries written by this subscriber should be considered transient. + const bool is_transient; + + /// The last topic entry version successfully processed by this subscriber. Only + /// written by a single thread at a time but can be read concurrently. + AtomicInt64 last_version{TOPIC_INITIAL_VERSION}; + + /// Map from the key to the version of a transient update made by this subscriber. + /// protected by Subscriber:: 'transient_entries_lock_'. + boost::unordered_map<TopicEntryKey, TopicEntry::Version> transient_entries_; }; /// The set of topics subscribed to, and current state (as seen by this subscriber) of /// the topic. - typedef boost::unordered_map<TopicId, TopicState> Topics; + typedef boost::unordered_map<TopicId, TopicSubscription> Topics; /// The Version value used to initialize new Topic subscriptions for this Subscriber. static const TopicEntry::Version TOPIC_INITIAL_VERSION; - const Topics& subscribed_topics() const { return subscribed_topics_; } + const Topics& non_priority_subscribed_topics() const { + return non_priority_subscribed_topics_; + } + const Topics& priority_subscribed_topics() const { return priority_subscribed_topics_; } const TNetworkAddress& network_address() const { return network_address_; } const SubscriberId& id() const { return subscriber_id_; } const RegistrationId& registration_id() const { return registration_id_; } - /// Records the fact that an update to this topic is owned by this subscriber. The - /// version number of the update is saved so that only those updates which are made - /// most recently by this subscriber - and not overwritten by another subscriber - are - /// deleted on failure. If the topic the entry belongs to is not marked as transient, - /// no update will be recorded. - void AddTransientUpdate(const TopicId& topic_id, const TopicEntryKey& topic_key, - TopicEntry::Version version); - - /// Map from the topic / key pair to the version of a transient update made by this - /// subscriber. - typedef boost::unordered_map<std::pair<TopicId, TopicEntryKey>, TopicEntry::Version> - TransientEntryMap; - - const TransientEntryMap& transient_entries() const { return transient_entries_; } + /// Get the Topics map that would be used to store 'topic_id'. + const Topics& GetTopicsMapForId(const TopicId& topic_id) const { + return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_ + : non_priority_subscribed_topics_; + } + Topics* GetTopicsMapForId(const TopicId& topic_id) { + return IsPrioritizedTopic(topic_id) ? &priority_subscribed_topics_ + : &non_priority_subscribed_topics_; + } + + /// Records the fact that updates to this topic are owned by this subscriber. The + /// version number of each update (which must be at the corresponding index in + /// 'versions' is saved so that only those updates which are made most recently by + /// this subscriber - and not overwritten by another subscriber - are deleted on + /// failure. If the topic each entry belongs to is not marked as transient, no update + /// will be recorded. Should not be called concurrently from multiple threads for a + /// given 'topic_id'. + /// + /// Returns false if DeleteAllTransientEntries() was called and 'topic_id' entries + /// are transient, in which case the caller should delete the entries themselves. + bool AddTransientEntries(const TopicId& topic_id, + const std::vector<TTopicItem>& entries, + const std::vector<TopicEntry::Version>& entry_versions) WARN_UNUSED_RESULT; + + /// Delete all transient topic entries for this subscriber from 'global_topics'. + /// + /// Statestore::topics_map_lock_ (in shared mode) must be held by the caller. + void DeleteAllTransientEntries(TopicMap* global_topics); + + /// Returns the number of transient entries. + int64_t NumTransientEntries(); /// Returns the last version of the topic which this subscriber has successfully /// processed. Will never decrease. @@ -328,7 +390,8 @@ class Statestore : public CacheLineAligned { /// Sets the subscriber's last processed version of the topic to the given value. This /// should only be set when once a subscriber has succesfully processed the given - /// update corresponding to this version. + /// update corresponding to this version. Should not be called concurrently from + /// multiple threads for a given 'topic_id'. void SetLastTopicVersionProcessed(const TopicId& topic_id, TopicEntry::Version version); @@ -346,19 +409,25 @@ class Statestore : public CacheLineAligned { /// The location of the subscriber service that this subscriber runs. const TNetworkAddress network_address_; - /// Map of topic subscriptions to current TopicState. The the state describes whether - /// updates on the topic are 'transient' (i.e., to be deleted upon subscriber failure) - /// or not and contains the version number of the last update processed by this - /// Subscriber on the topic. - Topics subscribed_topics_; - - /// List of updates made by this subscriber so that transient entries may be deleted on - /// failure. - TransientEntryMap transient_entries_; + /// Maps of topic subscriptions to current TopicSubscription, with separate maps for + /// priority and non-priority topics. The state describes whether updates on the + /// topic are 'transient' (i.e., to be deleted upon subscriber failure) or not + /// and contains the version number of the last update processed by this Subscriber + /// on the topic. The set of keys is not modified after construction. + Topics priority_subscribed_topics_; + Topics non_priority_subscribed_topics_; + + /// Lock held when adding or deleting transient entries. See class comment for lock + /// acquisition order. + boost::mutex transient_entry_lock_; + + /// True once DeleteAllTransientEntries() has been called during subscriber + /// unregisteration. Protected by 'transient_entry_lock_' + bool unregistered_ = false; }; - /// Protects access to subscribers_ and subscriber_uuid_generator_. Must be taken before - /// topic_lock_. + /// Protects access to subscribers_ and subscriber_uuid_generator_. See the class + /// comment for the lock acquisition order. boost::mutex subscribers_lock_; /// Map of subscribers currently connected; upon failure their entry is removed from this @@ -393,11 +462,12 @@ class Statestore : public CacheLineAligned { registration_id(r_id) {} }; - /// The statestore has two pools of threads that send messages to subscribers + /// The statestore has three pools of threads that send messages to subscribers /// one-by-one. One pool deals with 'heartbeat' messages that update failure detection - /// state, and the other pool sends 'topic update' messages which contain the - /// actual topic data that a subscriber does not yet have. - // + /// state, and the remaining pools send 'topic update' messages that contain the + /// actual topic data that a subscriber does not yet have, with one pool dedicated to + /// a set of special "prioritized" topics. + /// /// Each message is scheduled for some time in the future and each worker thread /// will sleep until that time has passed to rate-limit messages. Subscribers are /// placed back into the queue once they have been processed. A subscriber may have many @@ -405,24 +475,31 @@ class Statestore : public CacheLineAligned { /// subscriber. Since at most one registration is considered 'live' per subscriber, this /// guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in /// the queue. - // + /// /// Messages may be delayed for any number of reasons, including scheduler /// interference, lock unfairness when submitting to the thread pool and head-of-line /// blocking when threads are occupied sending messages to slow subscribers /// (subscribers are not guaranteed to be in the queue in next-update order). - // + /// /// Delays for heartbeat messages can result in the subscriber that is kept waiting /// assuming that the statestore has failed. Correct configuration of heartbeat message /// frequency and subscriber timeout is therefore very important, and depends upon the /// cluster size. See --statestore_heartbeat_frequency_ms and /// --statestore_subscriber_timeout_seconds. We expect that the provided defaults will /// work up to clusters of several hundred nodes. - // + /// /// Subscribers are therefore not processed in lock-step, and one subscriber may have /// seen many more messages than another during the same interval (if the second /// subscriber runs slow for any reason). + enum class UpdateKind { + TOPIC_UPDATE, + PRIORITY_TOPIC_UPDATE, + HEARTBEAT + }; ThreadPool<ScheduledSubscriberUpdate> subscriber_topic_update_threadpool_; + ThreadPool<ScheduledSubscriberUpdate> subscriber_priority_topic_update_threadpool_; + ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_; /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per @@ -452,10 +529,11 @@ class Statestore : public CacheLineAligned { IntGauge* value_size_metric_; IntGauge* topic_size_metric_; - /// Tracks the distribution of topic-update durations - precisely the time spent in - /// calling the UpdateState() RPC which allows us to measure the network transmission - /// cost as well as the subscriber-side processing time. + /// Tracks the distribution of topic-update durations for regular and prioritized topic + /// updates. This measures the time spent in calling the UpdateState() RPC which + /// includes network transmission cost and subscriber-side processing time. StatsMetric<double>* topic_update_duration_metric_; + StatsMetric<double>* priority_topic_update_duration_metric_; /// Same as above, but for SendHeartbeat() RPCs. StatsMetric<double>* heartbeat_duration_metric_; @@ -466,23 +544,25 @@ class Statestore : public CacheLineAligned { ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT; /// Sends either a heartbeat or topic update message to the subscriber in 'update' at - /// the closest possible time to the first member of 'update'. If is_heartbeat is true, - /// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once - /// complete, the next update is scheduled and added to the appropriate queue. - void DoSubscriberUpdate(bool is_heartbeat, int thread_id, + /// the closest possible time to the first member of 'update'. If 'update_kind' is + /// HEARTBEAT, sends a heartbeat update, otherwise the set of priority/non-priority + /// pending topic updates is sent. Once complete, the next update is scheduled and + /// added to the appropriate queue. + void DoSubscriberUpdate(UpdateKind update_kind, int thread_id, const ScheduledSubscriberUpdate& update); /// Does the work of updating a single subscriber, by calling UpdateState() on the client /// to send a list of topic deltas to the subscriber. If that call fails (either because /// the RPC could not be completed, or the subscriber indicated an error), this method /// returns a non-OK status immediately without further processing. - // + /// /// The subscriber may indicated that it skipped processing the message, either because /// it was not ready to do so or because it was busy. In that case, the UpdateState() RPC /// will return OK (since there was no error) and the output parameter update_skipped is /// set to true. Otherwise, any updates returned by the subscriber are applied to their /// target topics. - Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT; + Status SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind, + bool* update_skipped) WARN_UNUSED_RESULT; /// Sends a heartbeat message to subscriber. Returns false if there was some error /// performing the RPC. @@ -501,9 +581,10 @@ class Statestore : public CacheLineAligned { void UnregisterSubscriber(Subscriber* subscriber); /// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates - /// over all updates in all subscribed topics, populating the given TUpdateStateRequest - /// object. Takes the topic_lock_ and subscribers_lock_. - void GatherTopicUpdates(const Subscriber& subscriber, + /// over all updates in all priority or non-priority subscribed topics, based on + /// 'update_kind'. The given TUpdateStateRequest object is populated with the + /// changes to the subscribed topics. Takes the topics_map_lock_ and subscribers_lock_. + void GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind, TUpdateStateRequest* update_state_request); /// Returns the minimum last processed topic version across all subscribers for the given @@ -523,8 +604,14 @@ class Statestore : public CacheLineAligned { TopicEntry::Version GetMinSubscriberTopicVersion( const TopicId& topic_id, SubscriberId* subscriber_id = NULL); - /// True if the shutdown flag has been set true, false otherwise. - bool ShouldExit(); + /// Returns true if this topic should be handled by the priority pool. + static bool IsPrioritizedTopic(const std::string& topic); + + /// Return human-readable name for 'kind'. + static const char* GetUpdateKindName(UpdateKind kind); + + /// Return the thread pool to process updates of 'kind'. + ThreadPool<ScheduledSubscriberUpdate>* GetThreadPool(UpdateKind kind); /// Webpage handler: upon return, 'document' will contain a list of topics as follows: /// "topics": [ http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/common/thrift/metrics.json ---------------------------------------------------------------------- diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index df78a0b..b457741 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -972,7 +972,18 @@ "key": "statestore-subscriber.topic-$0.processing-time-s" }, { - "description": "The time (sec) taken to process Statestore subcriber topic updates.", + "description": "Interval between topic updates for Topic $0", + "contexts": [ + "CATALOGSERVER", + "IMPALAD" + ], + "label": "Statestore Subscriber Topic $0 Update Interval", + "units": "TIME_S", + "kind": "STATS", + "key": "statestore-subscriber.topic-$0.update-interval" + }, + { + "description": "The time (sec) taken to process Statestore subscriber topic updates.", "contexts": [ "CATALOGSERVER", "IMPALAD" @@ -1024,7 +1035,7 @@ "key": "statestore.live-backends.list" }, { - "description": "The time (sec) spent sending topic update RPCs. Includes subscriber-side processing time and network transmission time.", + "description": "The time (sec) spent sending non-priority topic update RPCs. Includes subscriber-side processing time and network transmission time.", "contexts": [ "STATESTORE" ], @@ -1034,6 +1045,16 @@ "key": "statestore.topic-update-durations" }, { + "description": "The time (sec) spent sending priority topic update RPCs. Includes subscriber-side processing time and network transmission time.", + "contexts": [ + "STATESTORE" + ], + "label": "Statestore Priority Topic Update Durations", + "units": "TIME_S", + "kind": "STATS", + "key": "statestore.priority-topic-update-durations" + }, + { "description": "The sum of the size of all keys for all topics tracked by the StateStore.", "contexts": [ "STATESTORE" http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index ccbbd32..abf5a08 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -120,9 +120,13 @@ MAX_NUM_QUEUED_QUERIES = 10 # Mem limit (bytes) used in the mem limit test MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024 -_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\ - "-statestore_update_frequency_ms=%s" %\ - (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS) +_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} " + "-statestore_priority_update_frequency_ms={freq_ms}").format( + freq_ms=STATESTORE_RPC_FREQUENCY_MS) + +# Name of the subscriber metric tracking the admission control update interval. +REQUEST_QUEUE_UPDATE_INTERVAL =\ + 'statestore-subscriber.topic-impala-request-queue.update-interval' # Key in the query profile for the query options. PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): " @@ -552,14 +556,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): sleep(1) def wait_for_statestore_updates(self, heartbeats): - """Waits for a number of statestore heartbeats from all impalads.""" + """Waits for a number of admission control statestore updates from all impalads.""" start_time = time() num_impalads = len(self.impalads) init = dict() curr = dict() for impalad in self.impalads: - init[impalad] = impalad.service.get_metric_value(\ - 'statestore-subscriber.topic-update-interval-time')['count'] + init[impalad] = impalad.service.get_metric_value( + REQUEST_QUEUE_UPDATE_INTERVAL)['count'] curr[impalad] = init[impalad] while True: @@ -567,8 +571,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): init.values(), [curr[i] - init[i] for i in self.impalads]) if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break for impalad in self.impalads: - curr[impalad] = impalad.service.get_metric_value(\ - 'statestore-subscriber.topic-update-interval-time')['count'] + curr[impalad] = impalad.service.get_metric_value( + REQUEST_QUEUE_UPDATE_INTERVAL)['count'] assert (time() - start_time < STRESS_TIMEOUT),\ "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,) sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000)) http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index 1003dc7..ba51c8d 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -15,9 +15,11 @@ # specific language governing permissions and limitations # under the License. +from collections import defaultdict import json import socket import threading +import traceback import time import urllib2 import uuid @@ -167,7 +169,10 @@ class StatestoreSubscriber(object): more readable.""" def __init__(self, heartbeat_cb=None, update_cb=None): self.heartbeat_event, self.heartbeat_count = threading.Condition(), 0 - self.update_event, self.update_count = threading.Condition(), 0 + # Track the number of updates received per topic. + self.update_counts = defaultdict(lambda : 0) + # Variables to notify for updates on each topic. + self.update_event = threading.Condition() self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb self.exception = None @@ -191,12 +196,14 @@ class StatestoreSubscriber(object): """UpdateState RPC handler. Calls update callback if one exists.""" self.update_event.acquire() try: - self.update_count += 1 + for topic_name in args.topic_deltas: self.update_counts[topic_name] += 1 response = DEFAULT_UPDATE_STATE_RESPONSE if self.update_cb is not None and self.exception is None: try: response = self.update_cb(self, args) except Exception, e: + # Print the original backtrace so it doesn't get lost. + traceback.print_exc() self.exception = e self.update_event.notify() finally: @@ -278,21 +285,23 @@ class StatestoreSubscriber(object): finally: self.heartbeat_event.release() - def wait_for_update(self, count=None): - """Waits for some number of updates. If 'count' is provided, waits until the number - of updates seen by this subscriber exceeds count, otherwise waits for one further - update.""" + def wait_for_update(self, topic_name, count=None): + """Waits for some number of updates of 'topic_name'. If 'count' is provided, waits + until the number updates seen by this subscriber exceeds count, otherwise waits + for one further update.""" self.update_event.acquire() + start_time = time.time() try: - if count is not None and self.update_count >= count: return self - if count is None: count = self.update_count + 1 - while count > self.update_count: + if count is not None and self.update_counts[topic_name] >= count: return self + if count is None: count = self.update_counts[topic_name] + 1 + while count > self.update_counts[topic_name]: self.check_thread_exceptions() - last_count = self.update_count + last_count = self.update_counts[topic_name] self.update_event.wait(10) - if last_count == self.update_count: - raise Exception("Update not received within 10s (update count: %s)" % - self.update_count) + if (time.time() > start_time + 10 and + last_count == self.update_counts[topic_name]): + raise Exception("Update not received for %s within 10s (update count: %s)" % + (topic_name, last_count)) self.check_thread_exceptions() return self finally: @@ -340,14 +349,18 @@ class TestStatestore(): def topic_update_correct(sub, args): delta = self.make_topic_update(topic_name) - if sub.update_count == 1: + update_count = sub.update_counts[topic_name] + if topic_name not in args.topic_deltas: + # The update doesn't contain our topic. + pass + elif update_count == 1: return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta], skipped=False) - elif sub.update_count == 2: - assert len(args.topic_deltas) == 1 + elif update_count == 2: + assert len(args.topic_deltas) == 1, args.topic_deltas assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries assert args.topic_deltas[topic_name].topic_name == delta.topic_name - elif sub.update_count == 3: + elif update_count == 3: # After the content-bearing update was processed, the next delta should be empty assert len(args.topic_deltas[topic_name].topic_entries) == 0 @@ -358,7 +371,7 @@ class TestStatestore(): ( sub.start() .register(topics=[reg]) - .wait_for_update(3) + .wait_for_update(topic_name, 3) ) def test_update_is_delta(self): @@ -368,14 +381,18 @@ class TestStatestore(): topic_name = "test_update_is_delta_%s" % uuid.uuid1() def check_delta(sub, args): - if sub.update_count == 1: + update_count = sub.update_counts[topic_name] + if topic_name not in args.topic_deltas: + # The update doesn't contain our topic. + pass + elif update_count == 1: assert args.topic_deltas[topic_name].is_delta == False delta = self.make_topic_update(topic_name) return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta], skipped=False) - elif sub.update_count == 2: + elif update_count == 2: assert args.topic_deltas[topic_name].is_delta == False - elif sub.update_count == 3: + elif update_count == 3: assert args.topic_deltas[topic_name].is_delta == True assert len(args.topic_deltas[topic_name].topic_entries) == 0 assert args.topic_deltas[topic_name].to_version == 1 @@ -387,7 +404,7 @@ class TestStatestore(): ( sub.start() .register(topics=[reg]) - .wait_for_update(3) + .wait_for_update(topic_name, 3) ) def test_skipped(self): @@ -395,7 +412,10 @@ class TestStatestore(): topic_name = "test_skipped_%s" % uuid.uuid1() def check_skipped(sub, args): - if sub.update_count == 1: + # Ignore responses that don't contain our topic. + if topic_name not in args.topic_deltas: return DEFAULT_UPDATE_STATE_RESPONSE + update_count = sub.update_counts[topic_name] + if update_count == 1: update = self.make_topic_update(topic_name) return TUpdateStateResponse(status=STATUS_OK, topic_updates=[update], skipped=False) @@ -410,15 +430,17 @@ class TestStatestore(): ( sub.start() .register(topics=[reg]) - .wait_for_update(3) + .wait_for_update(topic_name, 3) ) def test_failure_detected(self): sub = StatestoreSubscriber() + topic_name = "test_failure_detected" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) ( sub.start() - .register() - .wait_for_update(1) + .register(topics=[reg]) + .wait_for_update(topic_name, 1) .kill() .wait_for_failure() ) @@ -428,10 +450,12 @@ class TestStatestore(): minutes) the statestore should time them out every 3s and then eventually fail after 40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)""" sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300)) + topic_name = "test_hung_heartbeat" + reg = TTopicRegistration(topic_name=topic_name, is_transient=True) ( sub.start() - .register() - .wait_for_update(1) + .register(topics=[reg]) + .wait_for_update(topic_name, 1) .wait_for_failure(timeout=60) ) @@ -443,21 +467,32 @@ class TestStatestore(): transient_topic_name = "test_topic_persistence_transient_%s" % topic_id def add_entries(sub, args): - if sub.update_count == 1: - updates = [self.make_topic_update(persistent_topic_name), - self.make_topic_update(transient_topic_name)] + # None of, one or both of the topics may be in the update. + updates = [] + if (persistent_topic_name in args.topic_deltas and + sub.update_counts[persistent_topic_name] == 1): + updates.append(self.make_topic_update(persistent_topic_name)) + + if (transient_topic_name in args.topic_deltas and + sub.update_counts[transient_topic_name] == 1): + updates.append(self.make_topic_update(transient_topic_name)) + + if len(updates) > 0: return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates, skipped=False) - return DEFAULT_UPDATE_STATE_RESPONSE def check_entries(sub, args): - if sub.update_count == 1: - assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0 + # None of, one or both of the topics may be in the update. + if (persistent_topic_name in args.topic_deltas and + sub.update_counts[persistent_topic_name] == 1): assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1 # Statestore should not send deletions when the update is not a delta, see # IMPALA-1891 assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False + if (transient_topic_name in args.topic_deltas and + sub.update_counts[persistent_topic_name] == 1): + assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0 return DEFAULT_UPDATE_STATE_RESPONSE reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False), @@ -467,7 +502,8 @@ class TestStatestore(): ( sub.start() .register(topics=reg) - .wait_for_update(2) + .wait_for_update(persistent_topic_name, 2) + .wait_for_update(transient_topic_name, 2) .kill() .wait_for_failure() ) @@ -476,5 +512,6 @@ class TestStatestore(): ( sub2.start() .register(topics=reg) - .wait_for_update(1) + .wait_for_update(persistent_topic_name, 1) + .wait_for_update(transient_topic_name, 1) ) http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/www/statestore_subscribers.tmpl ---------------------------------------------------------------------- diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl index f117002..f57b4f6 100644 --- a/www/statestore_subscribers.tmpl +++ b/www/statestore_subscribers.tmpl @@ -25,6 +25,7 @@ under the License. <th>Id</th> <th>Address</th> <th>Subscribed topics</th> + <th>Subscribed priority topics</th> <th>Transient entries</th> <th>Registration ID</th> </tr> @@ -34,6 +35,7 @@ under the License. <td>{{id}}</td> <td>{{address}}</td> <td>{{num_topics}}</td> + <td>{{num_priority_topics}}</td> <td>{{num_transient}}</td> <td>{{registration_id}}</td> </tr> http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/www/statestore_topics.tmpl ---------------------------------------------------------------------- diff --git a/www/statestore_topics.tmpl b/www/statestore_topics.tmpl index 99f0dfe..56568f7 100644 --- a/www/statestore_topics.tmpl +++ b/www/statestore_topics.tmpl @@ -25,6 +25,7 @@ under the License. <th>Topic Id</th> <th>Number of entries</th> <th>Version</th> + <th>Prioritized</th> <th>Oldest subscriber version</th> <th>Oldest subscriber ID</th> <th>Size (keys / values / total)</th> @@ -35,6 +36,7 @@ under the License. <td>{{topic_id}}</td> <td>{{num_entries}}</td> <td>{{version}}</td> + <td>{{prioritized}}</td> <td>{{oldest_version}}</td> <td>{{oldest_id}}</td> <td>{{key_size}} / {{value_size}} / {{total_size}}</td>