This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d75807a195273cdba29e33f00b7b6f9bee012f62 Author: Kurt Deschler <[email protected]> AuthorDate: Fri Aug 18 07:44:44 2023 -0500 IMPALA-12385: Enable Periodic metrics by default This patch enables periodic metrics in query profiles by default and changes the metric collectors to be more suitable for mixed workloads. -Changed default of resource_trace_ratio to 1. -Changed profile metrics to use sampling counters which can automatically resize for long queries. -Reduced profile metric samping interval to 50ms to support short-running queries. -Changed fragment metrics to use the same sampling interval as periodic metrics. -Added singleton PeriodicCounterUpdater and thread for updating system (KRPC) metrics at a different period than fragment metrics. -Added new flag periodic_system_counter_update_period_ms for configuring system metric update period. Default is 500ms. Example profile output: - HostCpuIoWaitPercentage (400.000ms): 1, 0, 2, 3, 4, 6, 5, 2, 1, ... - HostCpuSysPercentage (400.000ms): 1, 12, 10, 11, 11, 5, 3, 3, 3, ... - HostCpuUserPercentage (400.000ms): 8, 46, 39, 39, 39, 29, 22, 23, ... Testing: -Updated runtime-profile-test and test_observability.py for new defaults -Manual inspection of query profile metrics for long-running queries -Tested for performance regression using 50 concurrent TPCH Q1 and with periodic_counter_update_period_ms=1 to verify that the periodic update loop does not affect peformance or become a bottleneck Change-Id: Ic8e5cbfd4b324081158574ceb8f4b3a062a69fd1 Reviewed-on: http://gerrit.cloudera.org:8080/20377 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Michael Smith <[email protected]> --- be/src/runtime/exec-env.cc | 2 +- be/src/runtime/krpc-data-stream-recvr.cc | 6 +- be/src/runtime/query-state.cc | 14 ++--- be/src/util/periodic-counter-updater.cc | 103 ++++++++++++++++++++----------- be/src/util/periodic-counter-updater.h | 22 +++++-- be/src/util/runtime-profile-counters.h | 12 +++- be/src/util/runtime-profile-test.cc | 3 +- be/src/util/runtime-profile.cc | 16 +++-- be/src/util/runtime-profile.h | 11 +++- be/src/util/streaming-sampler.h | 2 +- common/thrift/Query.thrift | 2 +- tests/query_test/test_observability.py | 37 +++++------ 12 files changed, 148 insertions(+), 82 deletions(-) diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 8f2768e49..c31bd597f 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -637,7 +637,7 @@ void ExecEnv::InitSystemStateInfo() { system_state_info_.reset(new SystemStateInfo()); PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() { s->CaptureSystemStateSnapshot(); - }); + }, true); } Status ExecEnv::GetKuduClient(const vector<string>& master_addresses, diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc index a3b804e7f..394fe64cb 100644 --- a/be/src/runtime/krpc-data-stream-recvr.cc +++ b/be/src/runtime/krpc-data-stream-recvr.cc @@ -707,7 +707,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, // Initialize various counters for measuring dequeuing from queues. bytes_dequeued_counter_ = ADD_COUNTER(dequeue_profile_, "TotalBytesDequeued", TUnit::BYTES); - bytes_dequeued_time_series_counter_ = ADD_TIME_SERIES_COUNTER( + bytes_dequeued_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER( dequeue_profile_, "BytesDequeued", bytes_dequeued_counter_); queue_get_batch_timer_ = ADD_TIMER(dequeue_profile_, "TotalGetBatchTime"); data_wait_timer_ = @@ -719,7 +719,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, // Initialize various counters for measuring enqueuing into queues. bytes_received_counter_ = ADD_COUNTER(enqueue_profile_, "TotalBytesReceived", TUnit::BYTES); - bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER( + bytes_received_time_series_counter_ = ADD_SYSTEM_TIME_SERIES_COUNTER( enqueue_profile_, "BytesReceived", bytes_received_counter_); deserialize_row_batch_timer_ = ADD_TIMER(enqueue_profile_, "DeserializeRowBatchTime"); @@ -735,7 +735,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT); deferred_rpcs_time_series_counter_ = enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT, - bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this)); + bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this), true); total_has_deferred_rpcs_timer_ = ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime"); dispatch_timer_ = diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 31f833b8d..5738448e8 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -218,33 +218,33 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params, // Initialize resource tracking counters. if (query_ctx().trace_resource_usage) { SystemStateInfo* system_state_info = exec_env->system_state_info(); - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () { return system_state_info->GetCpuUsageRatios().user; }); - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () { return system_state_info->GetCpuUsageRatios().system; }); - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () { return system_state_info->GetCpuUsageRatios().iowait; }); // Add network usage - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () { return system_state_info->GetNetworkUsage().rx_rate; }); - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () { return system_state_info->GetNetworkUsage().tx_rate; }); // Add disk stats - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostDiskReadThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () { return system_state_info->GetDiskStats().read_rate; }); - host_profile_->AddChunkedTimeSeriesCounter( + host_profile_->AddSamplingTimeSeriesCounter( "HostDiskWriteThroughput", TUnit::BYTES_PER_SECOND, [system_state_info] () { return system_state_info->GetDiskStats().write_rate; }); diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc index 8bef6aa9a..edb8a4d14 100644 --- a/be/src/util/periodic-counter-updater.cc +++ b/be/src/util/periodic-counter-updater.cc @@ -26,25 +26,43 @@ namespace posix_time = boost::posix_time; using boost::get_system_time; using boost::system_time; -// Period to update rate counters and sampling counters in ms. -DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and" - " sampling counters in ms"); +// Period to update query profile rate counters and sampling counters in ms. +DEFINE_int32(periodic_counter_update_period_ms, 50, "Period to update" + " query profile rate counters and sampling counters in ms"); + +// Period to update system-level rate counters and sampling counters in ms. +DEFINE_int32(periodic_system_counter_update_period_ms, 500, "Period to update" + " system-level rate counters and sampling counters in ms"); namespace impala { +// Updater for profile counters PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr; +// Updater for system counters +PeriodicCounterUpdater* PeriodicCounterUpdater::system_instance_ = nullptr; + void PeriodicCounterUpdater::Init() { - DCHECK(instance_ == nullptr); - // Create the singleton, which will live until the process terminates. - instance_ = new PeriodicCounterUpdater; + DCHECK(instance_ == nullptr && system_instance_ == nullptr); + // Create two singletons, which will live until the process terminates. + instance_ = new PeriodicCounterUpdater(FLAGS_periodic_counter_update_period_ms); + instance_->update_thread_.reset( - new thread(&PeriodicCounterUpdater::UpdateLoop, instance_)); + new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, instance_, instance_))); + + system_instance_ = + new PeriodicCounterUpdater(FLAGS_periodic_system_counter_update_period_ms); + + system_instance_->update_thread_.reset( + new thread(boost::bind(&PeriodicCounterUpdater::UpdateLoop, system_instance_, + system_instance_))); + } -void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) { - lock_guard<SpinLock> l(instance_->update_fns_lock_); - instance_->update_fns_.push_back(update_fn); +void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn, bool is_system) { + PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_; + lock_guard<SpinLock> l(instance->update_fns_lock_); + instance->update_fns_.push_back(update_fn); } void PeriodicCounterUpdater::RegisterPeriodicCounter( @@ -52,6 +70,9 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter( RuntimeProfile::SampleFunction sample_fn, RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) { DCHECK(src_counter == NULL || sample_fn == NULL); + DCHECK(src_counter == NULL || src_counter->GetIsSystem() == dst_counter->GetIsSystem()); + PeriodicCounterUpdater* instance = dst_counter->GetIsSystem() ? + system_instance_ : instance_; switch (type) { case RATE_COUNTER: { @@ -59,8 +80,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter( counter.src_counter = src_counter; counter.sample_fn = sample_fn; counter.elapsed_ms = 0; - lock_guard<SpinLock> ratelock(instance_->rate_lock_); - instance_->rate_counters_[dst_counter] = counter; + lock_guard<SpinLock> ratelock(instance->rate_lock_); + instance->rate_counters_[dst_counter] = counter; break; } case SAMPLING_COUNTER: { @@ -69,8 +90,8 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter( counter.sample_fn = sample_fn; counter.num_sampled = 0; counter.total_sampled_value = 0; - lock_guard<SpinLock> samplinglock(instance_->sampling_lock_); - instance_->sampling_counters_[dst_counter] = counter; + lock_guard<SpinLock> samplinglock(instance->sampling_lock_); + instance->sampling_counters_[dst_counter] = counter; break; } default: @@ -79,35 +100,43 @@ void PeriodicCounterUpdater::RegisterPeriodicCounter( } void PeriodicCounterUpdater::StopRateCounter(RuntimeProfile::Counter* counter) { - lock_guard<SpinLock> ratelock(instance_->rate_lock_); - instance_->rate_counters_.erase(counter); + PeriodicCounterUpdater* instance = counter->GetIsSystem() ? + system_instance_ : instance_; + lock_guard<SpinLock> ratelock(instance->rate_lock_); + instance->rate_counters_.erase(counter); } void PeriodicCounterUpdater::StopSamplingCounter(RuntimeProfile::Counter* counter) { - lock_guard<SpinLock> samplinglock(instance_->sampling_lock_); - instance_->sampling_counters_.erase(counter); + PeriodicCounterUpdater* instance = counter->GetIsSystem() ? + system_instance_ : instance_; + lock_guard<SpinLock> samplinglock(instance->sampling_lock_); + instance->sampling_counters_.erase(counter); } void PeriodicCounterUpdater::RegisterBucketingCounters( RuntimeProfile::Counter* src_counter, vector<RuntimeProfile::Counter*>* buckets) { + PeriodicCounterUpdater* instance = src_counter->GetIsSystem() ? + system_instance_ : instance_; BucketCountersInfo info; info.src_counter = src_counter; info.num_sampled = 0; - lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_); - instance_->bucketing_counters_[buckets] = info; + lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_); + instance->bucketing_counters_[buckets] = info; } void PeriodicCounterUpdater::StopBucketingCounters( - vector<RuntimeProfile::Counter*>* buckets) { + vector<RuntimeProfile::Counter*>* buckets, bool is_system) { int64_t num_sampled = 0; + PeriodicCounterUpdater* instance = is_system ? system_instance_ : instance_; { - lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_); + lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_); BucketCountersMap::iterator itr = - instance_->bucketing_counters_.find(buckets); + instance->bucketing_counters_.find(buckets); // If not registered, we have nothing to do. - if (itr == instance_->bucketing_counters_.end()) return; + if (itr == instance->bucketing_counters_.end()) return; + DCHECK(is_system == itr->second.src_counter->GetIsSystem()); num_sampled = itr->second.num_sampled; - instance_->bucketing_counters_.erase(itr); + instance->bucketing_counters_.erase(itr); } if (num_sampled > 0) { @@ -120,20 +149,24 @@ void PeriodicCounterUpdater::StopBucketingCounters( void PeriodicCounterUpdater::RegisterTimeSeriesCounter( RuntimeProfile::TimeSeriesCounter* counter) { - lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_); - instance_->time_series_counters_.insert(counter); + PeriodicCounterUpdater* instance = counter->GetIsSystem() ? + system_instance_ : instance_; + lock_guard<SpinLock> timeserieslock(instance->time_series_lock_); + instance->time_series_counters_.insert(counter); } void PeriodicCounterUpdater::StopTimeSeriesCounter( RuntimeProfile::TimeSeriesCounter* counter) { - lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_); - instance_->time_series_counters_.erase(counter); + PeriodicCounterUpdater* instance = counter->GetIsSystem() ? + system_instance_ : instance_; + lock_guard<SpinLock> timeserieslock(instance->time_series_lock_); + instance->time_series_counters_.erase(counter); } -void PeriodicCounterUpdater::UpdateLoop() { +void PeriodicCounterUpdater::UpdateLoop(PeriodicCounterUpdater* instance) { while (true) { system_time before_time = get_system_time(); - SleepForMs(FLAGS_periodic_counter_update_period_ms); + SleepForMs(update_period_); posix_time::time_duration elapsed = get_system_time() - before_time; int elapsed_ms = elapsed.total_milliseconds(); @@ -143,7 +176,7 @@ void PeriodicCounterUpdater::UpdateLoop() { } { - lock_guard<SpinLock> ratelock(instance_->rate_lock_); + lock_guard<SpinLock> ratelock(instance->rate_lock_); for (RateCounterMap::iterator it = rate_counters_.begin(); it != rate_counters_.end(); ++it) { it->second.elapsed_ms += elapsed_ms; @@ -160,7 +193,7 @@ void PeriodicCounterUpdater::UpdateLoop() { } { - lock_guard<SpinLock> samplinglock(instance_->sampling_lock_); + lock_guard<SpinLock> samplinglock(instance->sampling_lock_); for (SamplingCounterMap::iterator it = sampling_counters_.begin(); it != sampling_counters_.end(); ++it) { ++it->second.num_sampled; @@ -179,7 +212,7 @@ void PeriodicCounterUpdater::UpdateLoop() { } { - lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_); + lock_guard<SpinLock> bucketinglock(instance->bucketing_lock_); for (BucketCountersMap::iterator it = bucketing_counters_.begin(); it != bucketing_counters_.end(); ++it) { int64_t val = it->second.src_counter->value(); @@ -190,7 +223,7 @@ void PeriodicCounterUpdater::UpdateLoop() { } { - lock_guard<SpinLock> timeserieslock(instance_->time_series_lock_); + lock_guard<SpinLock> timeserieslock(instance->time_series_lock_); for (TimeSeriesCounters::iterator it = time_series_counters_.begin(); it != time_series_counters_.end(); ++it) { (*it)->AddSample(elapsed_ms); diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h index 0f92070e9..6b3ef0eaf 100644 --- a/be/src/util/periodic-counter-updater.h +++ b/be/src/util/periodic-counter-updater.h @@ -39,6 +39,11 @@ namespace impala { /// future stale samples from polluting the useful values. class PeriodicCounterUpdater { public: + + PeriodicCounterUpdater(const int32_t update_period) + : update_period_(update_period) { + } + enum PeriodicCounterType { RATE_COUNTER = 0, SAMPLING_COUNTER, @@ -52,7 +57,7 @@ class PeriodicCounterUpdater { /// Registers an update function that will be called before individual counters will be /// updated. This can be used to update some global metric once before reading it /// through individual counters. - static void RegisterUpdateFunction(UpdateFn update_fn); + static void RegisterUpdateFunction(UpdateFn update_fn, bool is_system); /// Registers a periodic counter to be updated by the update thread. /// Either sample_fn or dst_counter must be non-NULL. When the periodic counter @@ -80,7 +85,8 @@ class PeriodicCounterUpdater { /// convert the buckets from count to percentage. If not registered, has no effect. /// Perioidic counters are updated periodically so should be removed as soon as the /// underlying counter is no longer going to change. - static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets); + static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets, + bool is_system = false); /// Stops 'counter' from receiving any more samples. static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter); @@ -107,7 +113,7 @@ class PeriodicCounterUpdater { /// Loop for periodic counter update thread. This thread wakes up once in a while /// and updates all the added rate counters and sampling counters. - [[noreturn]] void UpdateLoop(); + [[noreturn]] void UpdateLoop(PeriodicCounterUpdater* instance); /// Thread performing asynchronous updates. boost::scoped_ptr<boost::thread> update_thread_; @@ -149,8 +155,14 @@ class PeriodicCounterUpdater { typedef boost::unordered_set<RuntimeProfile::TimeSeriesCounter*> TimeSeriesCounters; TimeSeriesCounters time_series_counters_; - /// Singleton object that keeps track of all rate counters and the thread - /// for updating them. + /// Singleton object that keeps track of all profile rate counters and the thread + /// for updating them. Interval set by flag periodic_counter_update_period_ms. static PeriodicCounterUpdater* instance_; + + /// Singleton object that keeps track of all system rate counters and the thread + /// for updating them. Interval set by flag periodic_system_counter_update_period_ms. + static PeriodicCounterUpdater* system_instance_; + + int32_t update_period_; }; } diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h index 316c2d083..8b12662e1 100644 --- a/be/src/util/runtime-profile-counters.h +++ b/be/src/util/runtime-profile-counters.h @@ -54,6 +54,8 @@ namespace impala { #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit) #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \ (profile)->AddSamplingTimeSeriesCounter(name, src_counter) + #define ADD_SYSTEM_TIME_SERIES_COUNTER(profile, name, src_counter) \ + (profile)->AddSamplingTimeSeriesCounter(name, src_counter, true) #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS) #define ADD_SUMMARY_STATS_TIMER(profile, name) \ (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS) @@ -754,6 +756,9 @@ class RuntimeProfile::TimeSeriesCounter { TUnit::type unit() const { return unit_; } + void SetIsSystem() { is_system_ = true; } + bool GetIsSystem() const { return is_system_; } + private: friend class RuntimeProfile; @@ -794,7 +799,7 @@ class RuntimeProfile::TimeSeriesCounter { protected: TimeSeriesCounter(const std::string& name, TUnit::type unit, SampleFunction fn = SampleFunction()) - : name_(name), unit_(unit), sample_fn_(fn) {} + : name_(name), unit_(unit), sample_fn_(fn), is_system_(false) {} std::string name_; TUnit::type unit_; @@ -802,6 +807,7 @@ class RuntimeProfile::TimeSeriesCounter { /// The number of samples that have been retrieved and cleared from this counter. int64_t previous_sample_count_ = 0; mutable SpinLock lock_; + bool is_system_; }; typedef StreamingSampler<int64_t, 64> StreamingCounterSampler; @@ -811,8 +817,8 @@ class RuntimeProfile::SamplingTimeSeriesCounter friend class RuntimeProfile; SamplingTimeSeriesCounter( - const std::string& name, TUnit::type unit, SampleFunction fn) - : TimeSeriesCounter(name, unit, fn) {} + const std::string& name, TUnit::type unit, SampleFunction fn, int initial_period) + : TimeSeriesCounter(name, unit, fn), samples_(initial_period) {} virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override; virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override; diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc index bd5f4903f..3a446e897 100644 --- a/be/src/util/runtime-profile-test.cc +++ b/be/src/util/runtime-profile-test.cc @@ -1105,7 +1105,7 @@ void ValidateSampler(const StreamingSampler<int, 10>& sampler, int expected_num, } TEST(CountersTest, StreamingSampler) { - StreamingSampler<int, 10> sampler; + StreamingSampler<int, 10> sampler(500); int idx = 0; for (int i = 0; i < 3; ++i) { @@ -1623,6 +1623,7 @@ TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) { RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); const TimeSeriesTestParam& param = GetParam(); + FLAGS_periodic_counter_update_period_ms = 500; const int test_period = FLAGS_periodic_counter_update_period_ms; // Add a counter with a sample function that counts up, starting from 0. diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 9b7c2b407..0b5ec53ca 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -53,6 +53,7 @@ DECLARE_int32(status_report_interval_ms); DECLARE_int32(periodic_counter_update_period_ms); +DECLARE_int32(periodic_system_counter_update_period_ms); // This must be set on the coordinator to enable the alternative profile representation. // It should not be set on executors - the setting is sent to the executors by @@ -2018,12 +2019,19 @@ RuntimeProfileBase::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter( } RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter( - const string& name, TUnit::type unit, SampleFunction fn) { + const string& name, TUnit::type unit, SampleFunction fn, bool is_system) { DCHECK(fn != nullptr); lock_guard<SpinLock> l(counter_map_lock_); TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name); if (it != time_series_counter_map_.end()) return it->second; - TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn)); + int32_t update_interval = is_system ? + FLAGS_periodic_system_counter_update_period_ms : + FLAGS_periodic_counter_update_period_ms; + TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, + unit, fn, update_interval)); + if (is_system) { + counter->SetIsSystem(); + } time_series_counter_map_[name] = counter; PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter); has_active_periodic_counters_ = true; @@ -2031,10 +2039,10 @@ RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter( } RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter( - const string& name, Counter* src_counter) { + const string& name, Counter* src_counter, bool is_system) { DCHECK(src_counter != NULL); return AddSamplingTimeSeriesCounter(name, src_counter->unit(), - bind(&Counter::value, src_counter)); + bind(&Counter::value, src_counter), is_system); } void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) { diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index c6b8fece6..419b8d916 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -99,7 +99,8 @@ class RuntimeProfileBase { class Counter { public: - Counter(TUnit::type unit, int64_t value = 0) : value_(value), unit_(unit) {} + Counter(TUnit::type unit, int64_t value = 0) + : value_(value), unit_(unit), is_system_(false) {} virtual ~Counter(){} virtual void Add(int64_t delta) { @@ -144,11 +145,15 @@ class RuntimeProfileBase { TUnit::type unit() const { return unit_; } + void SetIsSystem() { is_system_ = true; } + bool GetIsSystem() const { return is_system_; } + protected: friend class RuntimeProfile; AtomicInt64 value_; TUnit::type unit_; + bool is_system_; }; class AveragedCounter; @@ -658,11 +663,11 @@ class RuntimeProfile : public RuntimeProfileBase { /// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing. /// Note: these counters don't get merged (to make average profiles) TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, - TUnit::type unit, SampleFunction sample_fn); + TUnit::type unit, SampleFunction sample_fn, bool is_system = false); /// Same as above except the samples are collected from 'src_counter'. TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter* - src_counter); + src_counter, bool is_system = false); /// Adds a chunked time series counter to the profile. This begins sampling immediately. /// This counter will collect new samples periodically by calling 'sample_fn()'. Samples diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h index b395ace5d..cbfd2f204 100644 --- a/be/src/util/streaming-sampler.h +++ b/be/src/util/streaming-sampler.h @@ -37,7 +37,7 @@ template<typename T, int MAX_SAMPLES> class StreamingSampler { static_assert(std::is_arithmetic<T>::value, "Numerical type required"); public: - StreamingSampler(int initial_period = 500) + StreamingSampler(int initial_period) : samples_collected_(0) , period_(initial_period), current_sample_sum_(0), diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 3808eb579..39ba7f896 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -371,7 +371,7 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 74: optional string client_identifier; - 75: optional double resource_trace_ratio = 0; + 75: optional double resource_trace_ratio = 1; // See comment in ImpalaService.thrift. // The default value is set to 3 as this is the default value of HDFS replicas. diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index 67f5ca282..d6cd299c6 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -592,10 +592,10 @@ class TestObservability(ImpalaTestSuite): assert any(expected_str in line for line in profile.splitlines()) def test_query_profile_host_resource_metrics_off(self): - """Tests that the query profile does not contain resource usage metrics by default or + """Tests that the query profile does not contain resource usage metrics when disabled explicitly.""" query = "select count(*), sleep(1000) from functional.alltypes" - for query_opts in [None, {'resource_trace_ratio': 0.0}]: + for query_opts in [{'resource_trace_ratio': 0.0}]: profile = self.execute_query(query, query_opts).runtime_profile # Assert that no host resource counters exist in the profile for line in profile.splitlines(): @@ -604,23 +604,24 @@ class TestObservability(ImpalaTestSuite): assert not re.search("HostDiskReadThroughput", line) def test_query_profile_contains_host_resource_metrics(self): - """Tests that the query profile contains various CPU and network metrics.""" - query_opts = {'resource_trace_ratio': 1.0} + """Tests that the query profile contains various CPU and network metrics + by default or when enabled explicitly.""" query = "select count(*), sleep(1000) from functional.alltypes" - profile = self.execute_query(query, query_opts).runtime_profile - # We check for 500ms because a query with 1s duration won't hit the 64 values limit - # that would trigger resampling. - expected_strs = ["HostCpuIoWaitPercentage (500.000ms):", - "HostCpuSysPercentage (500.000ms):", - "HostCpuUserPercentage (500.000ms):", - "HostNetworkRx (500.000ms):", - "HostNetworkTx (500.000ms):", - "HostDiskReadThroughput (500.000ms):", - "HostDiskWriteThroughput (500.000ms):"] - - # Assert that all expected counters exist in the profile. - for expected_str in expected_strs: - assert any(expected_str in line for line in profile.splitlines()), expected_str + for query_opts in [{}, {'resource_trace_ratio': 1.0}]: + profile = self.execute_query(query, query_opts).runtime_profile + # We check for 50ms because a query with 1s duration won't hit the 64 values limit + # that would trigger resampling. + expected_strs = ["HostCpuIoWaitPercentage (50.000ms):", + "HostCpuSysPercentage (50.000ms):", + "HostCpuUserPercentage (50.000ms):", + "HostNetworkRx (50.000ms):", + "HostNetworkTx (50.000ms):", + "HostDiskReadThroughput (50.000ms):", + "HostDiskWriteThroughput (50.000ms):"] + + # Assert that all expected counters exist in the profile. + for expected_str in expected_strs: + assert any(expected_str in line for line in profile.splitlines()), expected_str # Check that there are some values for each counter. for line in profile.splitlines():
