This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new be30bd1e403 [improvement](spinlock) remove some potential bad spinlock
usage (#27904)
be30bd1e403 is described below
commit be30bd1e40305ec05cb2bda84879eb52869e906a
Author: yiguolei <[email protected]>
AuthorDate: Sat Dec 2 20:33:54 2023 +0800
[improvement](spinlock) remove some potential bad spinlock usage (#27904)
* [improvement](spinlock) remove some potential spinlock usage
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/runtime/query_statistics.cpp | 6 +-
be/src/runtime/query_statistics.h | 4 +-
be/src/runtime/runtime_filter_mgr.cpp | 1 -
be/src/runtime/user_function_cache.cpp | 1 -
be/src/util/metrics.cpp | 36 ++++----
be/src/util/metrics.h | 23 +++--
be/src/util/streaming_sampler.h | 150 ---------------------------------
7 files changed, 34 insertions(+), 187 deletions(-)
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index 7630362f5eb..02789e2dabe 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -116,7 +116,7 @@ QueryStatistics::~QueryStatistics() {
}
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int
sender_id) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_query_statistics.contains(sender_id)) {
_query_statistics[sender_id] = std::make_shared<QueryStatistics>();
}
@@ -126,12 +126,12 @@ void QueryStatisticsRecvr::insert(const PQueryStatistics&
statistics, int sender
void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int
sender_id) {
if (!statistics->collected()) return;
if (_query_statistics.contains(sender_id)) return;
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_query_statistics[sender_id] = statistics;
}
QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto it = _query_statistics.find(sender_id);
if (it != _query_statistics.end()) {
return it->second;
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index 4923e727961..fa39c9ea183 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -157,14 +157,14 @@ private:
friend class QueryStatistics;
void merge(QueryStatistics* statistics) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
for (auto& pair : _query_statistics) {
statistics->merge(*(pair.second));
}
}
std::map<int, QueryStatisticsPtr> _query_statistics;
- SpinLock _lock;
+ std::mutex _lock;
};
} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index cedfe8d1a06..bca128c652a 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -313,7 +313,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
std::to_string(request->filter_id()));
}
}
- // iter->second = pair{CntlVal,SpinLock}
cntVal = iter->second.first;
{
std::lock_guard<std::mutex> l(*iter->second.second);
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index c2b25971587..ab9d90846ab 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -91,7 +91,6 @@ struct UserFunctionCacheEntry {
// used to lookup a symbol
void* lib_handle = nullptr;
- SpinLock map_lock;
// from symbol_name to function pointer
std::unordered_map<std::string, void*> fptr_map;
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index de2d43c8548..23dbb628a0d 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -123,7 +123,7 @@ std::string Metric::to_prometheus(const std::string&
display_name, const Labels&
std::map<std::string, double> HistogramMetric::_s_output_percentiles = {
{"0.50", 50.0}, {"0.75", 75.0}, {"0.90", 90.0}, {"0.95", 95.0},
{"0.99", 99.0}};
void HistogramMetric::clear() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_stats.clear();
}
@@ -136,12 +136,12 @@ void HistogramMetric::add(const uint64_t& value) {
}
void HistogramMetric::merge(const HistogramMetric& other) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_stats.merge(other._stats);
}
void HistogramMetric::set_histogram(const HistogramStat& stats) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_stats.clear();
_stats.merge(stats);
}
@@ -228,7 +228,7 @@ std::string MetricPrototype::to_prometheus(const
std::string& registry_name) con
}
void MetricEntity::deregister_metric(const MetricPrototype* metric_type) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto metric = _metrics.find(metric_type);
if (metric != _metrics.end()) {
delete metric->second;
@@ -238,7 +238,7 @@ void MetricEntity::deregister_metric(const MetricPrototype*
metric_type) {
Metric* MetricEntity::get_metric(const std::string& name, const std::string&
group_name) const {
MetricPrototype dummy(MetricType::UNTYPED, MetricUnit::NOUNIT, name, "",
group_name);
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto it = _metrics.find(&dummy);
if (it == _metrics.end()) {
return nullptr;
@@ -247,7 +247,7 @@ Metric* MetricEntity::get_metric(const std::string& name,
const std::string& gro
}
void MetricEntity::register_hook(const std::string& name, const
std::function<void()>& hook) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
#ifndef BE_TEST
DCHECK(_hooks.find(name) == _hooks.end()) << "hook is already exist! " <<
_name << ":" << name;
#endif
@@ -255,7 +255,7 @@ void MetricEntity::register_hook(const std::string& name,
const std::function<vo
}
void MetricEntity::deregister_hook(const std::string& name) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_hooks.erase(name);
}
@@ -276,7 +276,7 @@ std::shared_ptr<MetricEntity>
MetricRegistry::register_entity(const std::string&
const Labels&
labels,
MetricEntityType
type) {
std::shared_ptr<MetricEntity> entity =
std::make_shared<MetricEntity>(type, name, labels);
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto inserted_entity = _entities.insert(std::make_pair(entity, 1));
if (!inserted_entity.second) {
// If exist, increase the registered count
@@ -286,7 +286,7 @@ std::shared_ptr<MetricEntity>
MetricRegistry::register_entity(const std::string&
}
void MetricRegistry::deregister_entity(const std::shared_ptr<MetricEntity>&
entity) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto found_entity = _entities.find(entity);
if (found_entity != _entities.end()) {
// Decrease the registered count
@@ -303,7 +303,7 @@ std::shared_ptr<MetricEntity>
MetricRegistry::get_entity(const std::string& name
MetricEntityType
type) {
std::shared_ptr<MetricEntity> dummy = std::make_shared<MetricEntity>(type,
name, labels);
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto entity = _entities.find(dummy);
if (entity == _entities.end()) {
return std::shared_ptr<MetricEntity>();
@@ -312,9 +312,9 @@ std::shared_ptr<MetricEntity>
MetricRegistry::get_entity(const std::string& name
}
void MetricRegistry::trigger_all_hooks(bool force) const {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
for (const auto& entity : _entities) {
- std::lock_guard<SpinLock> l(entity.first->_lock);
+ std::lock_guard<std::mutex> l(entity.first->_lock);
entity.first->trigger_hook_unlocked(force);
}
}
@@ -322,12 +322,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const {
std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const {
// Reorder by MetricPrototype
EntityMetricsByType entity_metrics_by_types;
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
for (const auto& entity : _entities) {
if (entity.first->_type == MetricEntityType::kTablet &&
!with_tablet_metrics) {
continue;
}
- std::lock_guard<SpinLock> l(entity.first->_lock);
+ std::lock_guard<std::mutex> l(entity.first->_lock);
entity.first->trigger_hook_unlocked(false);
for (const auto& metric : entity.first->_metrics) {
std::pair<MetricEntity*, Metric*> new_elem =
@@ -365,12 +365,12 @@ std::string MetricRegistry::to_prometheus(bool
with_tablet_metrics) const {
std::string MetricRegistry::to_json(bool with_tablet_metrics) const {
rj::Document doc {rj::kArrayType};
rj::Document::AllocatorType& allocator = doc.GetAllocator();
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
for (const auto& entity : _entities) {
if (entity.first->_type == MetricEntityType::kTablet &&
!with_tablet_metrics) {
continue;
}
- std::lock_guard<SpinLock> l(entity.first->_lock);
+ std::lock_guard<std::mutex> l(entity.first->_lock);
entity.first->trigger_hook_unlocked(false);
for (const auto& metric : entity.first->_metrics) {
rj::Value metric_obj(rj::kObjectType);
@@ -406,9 +406,9 @@ std::string MetricRegistry::to_json(bool
with_tablet_metrics) const {
std::string MetricRegistry::to_core_string() const {
std::stringstream ss;
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
for (const auto& entity : _entities) {
- std::lock_guard<SpinLock> l(entity.first->_lock);
+ std::lock_guard<std::mutex> l(entity.first->_lock);
entity.first->trigger_hook_unlocked(false);
for (const auto& metric : entity.first->_metrics) {
if (metric.first->is_core_metric) {
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 6007562eb05..ac7e69a4ef8 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -35,7 +35,6 @@
#include "util/core_local.h"
#include "util/histogram.h"
-#include "util/spinlock.h"
namespace doris {
@@ -111,17 +110,17 @@ public:
std::string to_string() const override { return std::to_string(value()); }
T value() const {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _value;
}
void increment(const T& delta) {
- std::lock_guard<SpinLock> l(this->_lock);
+ std::lock_guard<std::mutex> l(this->_lock);
_value += delta;
}
void set_value(const T& value) {
- std::lock_guard<SpinLock> l(this->_lock);
+ std::lock_guard<std::mutex> l(this->_lock);
_value = value;
}
@@ -130,14 +129,14 @@ public:
}
protected:
- // We use spinlock instead of std::atomic is because atomic don't support
+ // We use std::mutex instead of std::atomic is because atomic don't support
// double's fetch_add
// TODO(zc): If this is atomic is bottleneck, we change to thread local.
// performance: on Intel(R) Xeon(R) CPU E5-2450 int64_t
// original type: 2ns/op
- // single thread spinlock: 26ns/op
- // multiple thread(8) spinlock: 2500ns/op
- mutable SpinLock _lock;
+ // single thread std::mutex: 26ns/op
+ // multiple thread(8) std::mutex: 2500ns/op
+ mutable std::mutex _lock;
T _value;
};
@@ -202,7 +201,7 @@ public:
protected:
static std::map<std::string, double> _s_output_percentiles;
- mutable SpinLock _lock;
+ mutable std::mutex _lock;
HistogramStat _stats;
};
@@ -351,7 +350,7 @@ public:
template <typename T>
Metric* register_metric(const MetricPrototype* metric_type) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
auto inserted_metric = _metrics.insert(std::make_pair(metric_type,
nullptr));
if (inserted_metric.second) {
// If not exist, make a new metric pointer
@@ -377,7 +376,7 @@ private:
std::string _name;
Labels _labels;
- mutable SpinLock _lock;
+ mutable std::mutex _lock;
MetricMap _metrics;
std::map<std::string, std::function<void()>> _hooks;
};
@@ -421,7 +420,7 @@ public:
private:
const std::string _name;
- mutable SpinLock _lock;
+ mutable std::mutex _lock;
// MetricEntity -> register count
std::unordered_map<std::shared_ptr<MetricEntity>, int32_t,
MetricEntityHash,
MetricEntityEqualTo>
diff --git a/be/src/util/streaming_sampler.h b/be/src/util/streaming_sampler.h
deleted file mode 100644
index f5b727ac430..00000000000
--- a/be/src/util/streaming_sampler.h
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <string.h>
-
-#include <iostream>
-
-#include "util/spinlock.h"
-
-namespace doris {
-
-/// A fixed-size sampler to collect samples over time. AddSample should be
-/// called periodically with the sampled value. Samples are added at the max
-/// resolution possible. When the sample buffer is full, the current samples
-/// are collapsed and the collection period is doubled.
-/// The input period and the streaming sampler period do not need to match, the
-/// streaming sampler will average values.
-/// T is the type of the sample and must be a native numerical type (e.g. int
or float).
-template <typename T, int MAX_SAMPLES>
-class StreamingSampler {
-public:
- StreamingSampler(int initial_period = 500)
- : samples_collected_(0),
- period_(initial_period),
- current_sample_sum_(0),
- current_sample_count_(0),
- current_sample_total_time_(0) {}
-
- /// Initialize the sampler with values.
- StreamingSampler(int period, const std::vector<T>& initial_samples)
- : samples_collected_(initial_samples.size()),
- period_(period),
- current_sample_sum_(0),
- current_sample_count_(0),
- current_sample_total_time_(0) {
- DCHECK_LE(samples_collected_, MAX_SAMPLES);
- memcpy(samples_, &initial_samples[0], sizeof(T) * samples_collected_);
- }
-
- /// Add a sample to the sampler. 'ms' is the time elapsed since the last
time this
- /// was called.
- /// The input value is accumulated into current_*. If the total time
elapsed
- /// in current_sample_total_time_ is higher than the storage period, the
value is
- /// stored. 'sample' should be interpreted as a representative sample from
- /// (now - ms, now].
- /// TODO: we can make this more complex by taking a weighted average of
samples
- /// accumulated in a period.
- void AddSample(T sample, int ms) {
- std::lock_guard<SpinLock> l(lock_);
- ++current_sample_count_;
- current_sample_sum_ += sample;
- current_sample_total_time_ += ms;
-
- if (current_sample_total_time_ >= period_) {
- samples_[samples_collected_++] = current_sample_sum_ /
current_sample_count_;
- current_sample_count_ = 0;
- current_sample_sum_ = 0;
- current_sample_total_time_ = 0;
-
- if (samples_collected_ == MAX_SAMPLES) {
- /// collapse the samples in half by averaging them and
doubling the storage period
- period_ *= 2;
- for (int i = 0; i < MAX_SAMPLES / 2; ++i) {
- samples_[i] = (samples_[i * 2] + samples_[i * 2 + 1]) / 2;
- }
- samples_collected_ /= 2;
- }
- }
- }
-
- /// Get the samples collected. Returns the number of samples and
- /// the period they were collected at.
- /// If lock is non-null, the lock will be taken before returning. The
caller
- /// must unlock it.
- const T* GetSamples(int* num_samples, int* period, SpinLock** lock =
nullptr) const {
- if (lock != nullptr) {
- lock_.lock();
- *lock = &lock_;
- }
- *num_samples = samples_collected_;
- *period = period_;
- return samples_;
- }
-
- /// Set the underlying data to period/samples
- void SetSamples(int period, const std::vector<T>& samples) {
- DCHECK_LE(samples.size(), MAX_SAMPLES);
-
- std::lock_guard<SpinLock> l(lock_);
- period_ = period;
- samples_collected_ = samples.size();
- memcpy(samples_, &samples[0], sizeof(T) * samples_collected_);
- current_sample_sum_ = 0;
- current_sample_count_ = 0;
- current_sample_total_time_ = 0;
- }
-
- std::string DebugString(const std::string& prefix = "") const {
- std::lock_guard<SpinLock> l(lock_);
- std::stringstream ss;
- ss << prefix << "Period = " << period_ << std::endl
- << prefix << "Num = " << samples_collected_ << std::endl
- << prefix << "Samples = {";
- for (int i = 0; i < samples_collected_; ++i) {
- ss << samples_[i] << ", ";
- }
- ss << prefix << "}" << std::endl;
- return ss.str();
- }
-
-private:
- mutable SpinLock lock_;
-
- /// Aggregated samples collected. Note: this is not all the input samples
from
- /// AddSample(), as logically, those samples get resampled and aggregated.
- T samples_[MAX_SAMPLES];
-
- /// Number of samples collected <= MAX_SAMPLES.
- int samples_collected_;
-
- /// Storage period in ms.
- int period_;
-
- /// The sum of input samples that makes up the next stored sample.
- T current_sample_sum_;
-
- /// The number of input samples that contribute to current_sample_sum_.
- int current_sample_count_;
-
- /// The total time that current_sample_sum_ represents
- int current_sample_total_time_;
-};
-
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]