This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e450a7d7e16 [branch-3.0](lock) Remove SpinLock compeletely (#49972)
e450a7d7e16 is described below
commit e450a7d7e16f715e8244131e04c3e8fe70127ef1
Author: zclllyybb <[email protected]>
AuthorDate: Tue Apr 22 11:47:21 2025 +0800
[branch-3.0](lock) Remove SpinLock compeletely (#49972)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: https://github.com/apache/doris/pull/49872
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [x] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/cloud/cloud_tablets_channel.cpp | 2 +-
be/src/common/object_pool.h | 12 ++---
be/src/olap/delta_writer.h | 1 -
be/src/olap/delta_writer_v2.h | 1 -
be/src/olap/memtable_writer.cpp | 18 +++----
be/src/olap/memtable_writer.h | 6 +--
be/src/olap/rowset/beta_rowset_writer.h | 5 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 19 ++-----
be/src/olap/rowset/segment_creator.h | 6 ---
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 9 ----
be/src/olap/storage_engine.cpp | 6 +--
be/src/pipeline/exec/data_queue.h | 3 +-
be/src/runtime/load_channel.cpp | 3 +-
be/src/runtime/load_channel.h | 3 +-
be/src/runtime/load_stream_writer.h | 1 -
be/src/runtime/query_statistics.h | 7 +--
be/src/runtime/record_batch_queue.cpp | 3 +-
be/src/runtime/record_batch_queue.h | 5 +-
be/src/runtime/tablets_channel.cpp | 8 +--
be/src/runtime/tablets_channel.h | 3 +-
be/src/runtime/user_function_cache.cpp | 1 -
be/src/util/lru_multi_cache.h | 38 ++++++-------
be/src/util/lru_multi_cache.inline.h | 24 ++++-----
be/src/util/spinlock.h | 62 ----------------------
be/src/util/trace.h | 1 -
be/src/util/uuid_generator.h | 8 +--
be/src/vec/sink/writer/vtablet_writer.cpp | 12 ++---
be/src/vec/sink/writer/vtablet_writer.h | 7 ++-
be/test/util/threadpool_test.cpp | 9 ++--
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 1 -
30 files changed, 75 insertions(+), 209 deletions(-)
diff --git a/be/src/cloud/cloud_tablets_channel.cpp
b/be/src/cloud/cloud_tablets_channel.cpp
index 85b8e3ea33a..c38680a81dc 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -62,7 +62,7 @@ Status CloudTabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& reques
{
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
for (auto& [tablet_id, _] : tablet_to_rowidxs) {
auto tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h
index 3c000371a9c..89ab0442e0b 100644
--- a/be/src/common/object_pool.h
+++ b/be/src/common/object_pool.h
@@ -20,8 +20,6 @@
#include <mutex>
#include <vector>
-#include "util/spinlock.h"
-
namespace doris {
// An ObjectPool maintains a list of C++ objects which are deallocated
@@ -36,20 +34,20 @@ public:
template <class T>
T* add(T* t) {
// TODO: Consider using a lock-free structure.
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_objects.emplace_back(Element {t, [](void* obj) { delete
reinterpret_cast<T*>(obj); }});
return t;
}
template <class T>
T* add_array(T* t) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
_objects.emplace_back(Element {t, [](void* obj) { delete[]
reinterpret_cast<T*>(obj); }});
return t;
}
void clear() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
// reverse delete object to make sure the obj can
// safe access the member object construt early by
// object pool
@@ -65,7 +63,7 @@ public:
}
uint64_t size() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
return _objects.size();
}
@@ -83,7 +81,7 @@ private:
};
std::vector<Element> _objects;
- SpinLock _lock;
+ std::mutex _lock;
};
} // namespace doris
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 20f693d5e4d..412954d634d 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -36,7 +36,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
-#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index e4506ea0d23..bdab87058cb 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -39,7 +39,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
-#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 56312073c57..0710d8c4071 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -19,10 +19,8 @@
#include <fmt/format.h>
-#include <filesystem>
#include <ostream>
#include <string>
-#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
@@ -40,8 +38,6 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
-#include "runtime/memory/mem_tracker.h"
-#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
@@ -152,12 +148,12 @@ Status MemTableWriter::_flush_memtable_async() {
DCHECK(_flush_token != nullptr);
std::shared_ptr<MemTable> memtable;
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
memtable = _mem_table;
_mem_table = nullptr;
}
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
memtable->update_mem_type(MemType::WRITE_FINISHED);
_freezed_mem_tables.push_back(memtable);
}
@@ -211,7 +207,7 @@ Status MemTableWriter::wait_flush() {
void MemTableWriter::_reset_mem_table() {
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema,
_req.slots, _req.tuple_desc,
_unique_key_mow,
_partial_update_info.get()));
}
@@ -237,7 +233,7 @@ Status MemTableWriter::close() {
auto s = _flush_memtable_async();
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset();
}
_is_closed = true;
@@ -336,7 +332,7 @@ Status MemTableWriter::cancel_with_status(const Status& st)
{
return Status::OK();
}
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset();
}
if (_flush_token != nullptr) {
@@ -364,7 +360,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
}
int64_t mem_usage = 0;
{
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
for (const auto& mem_table : _freezed_mem_tables) {
auto mem_table_sptr = mem_table.lock();
if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() ==
mem) {
@@ -376,7 +372,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
}
int64_t MemTableWriter::active_memtable_mem_consumption() {
- std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+ std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
}
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 6065cbc1a12..1a1a3e3af2c 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -25,8 +25,6 @@
#include <cstdint>
#include <memory>
#include <mutex>
-#include <shared_mutex>
-#include <unordered_set>
#include <vector>
#include "common/status.h"
@@ -38,8 +36,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
-#include "util/spinlock.h"
-#include "util/uid_util.h"
namespace doris {
@@ -130,7 +126,7 @@ private:
// Save the not active memtable that is in flush queue or under flushing.
std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
// The lock to protect _memtable and _freezed_mem_tables structure to
avoid concurrency modification or read
- SpinLock _mem_table_ptr_lock;
+ std::mutex _mem_table_ptr_lock;
QueryThreadContext _query_thread_context;
std::mutex _lock;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 07be53647a3..2ae999eae20 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -44,7 +44,6 @@
#include "olap/rowset/segment_creator.h"
#include "segment_v2/inverted_index_file_writer.h"
#include "segment_v2/segment.h"
-#include "util/spinlock.h"
namespace doris {
namespace vectorized {
@@ -80,7 +79,7 @@ public:
}
private:
- mutable SpinLock _lock;
+ mutable std::mutex _lock;
std::unordered_map<int /* seg_id */, io::FileWriterPtr> _file_writers;
bool _closed {false};
};
@@ -109,7 +108,7 @@ public:
int64_t get_total_index_size() const { return _total_size; }
private:
- mutable SpinLock _lock;
+ mutable std::mutex _lock;
std::unordered_map<int /* seg_id */, InvertedIndexFileWriterPtr>
_inverted_index_file_writers;
int64_t _total_size = 0;
};
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index cf658b38bda..5aafb755e7f 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -19,33 +19,20 @@
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
-#include <stdint.h>
-#include <algorithm>
-#include <atomic>
-#include <condition_variable>
-#include <map>
+#include <cstdint>
#include <memory>
#include <mutex>
-#include <optional>
#include <roaring/roaring.hh>
-#include <string>
-#include <unordered_set>
#include <vector>
-#include "brpc/controller.h"
-#include "brpc/stream.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset.h"
-#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_creator.h"
-#include "segment_v2/segment.h"
-#include "util/spinlock.h"
namespace doris {
namespace vectorized {
@@ -120,7 +107,7 @@ public:
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const
override {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
*segment_num_rows = _segment_num_rows;
return Status::OK();
}
@@ -145,7 +132,7 @@ public:
}
private:
- mutable SpinLock _lock; // protect following vectors.
+ mutable std::mutex _lock; // protect following vectors.
// record rows number of every segment already written, using for rowid
// conversion when compaction in unique key with MoW model
std::vector<uint32_t> _segment_num_rows;
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index f8afd579892..351807cf279 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -20,18 +20,12 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>
-#include <string>
-#include <typeinfo>
-#include <unordered_map>
-#include <vector>
-
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
#include "olap/tablet_fwd.h"
-#include "util/spinlock.h"
#include "vec/core/block.h"
namespace doris {
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 46070f8dccd..2a722452876 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -20,24 +20,15 @@
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
-#include <algorithm>
#include <atomic>
#include <memory>
-#include <mutex>
#include <ostream>
-#include <string>
#include <utility>
#include "cloud/cloud_rowset_writer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "olap/rowset/beta_rowset.h"
-#include "olap/rowset/rowset_meta.h"
-#include "olap/rowset/rowset_writer_context.h"
#include "util/slice.h"
-#include "util/spinlock.h"
#include "vec/core/block.h"
namespace doris {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 1290ca29090..79da354409e 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -56,7 +56,6 @@
#include "olap/memtable_flush_executor.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
-#include "olap/olap_meta.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_meta_manager.h"
@@ -72,7 +71,6 @@
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
-#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
@@ -300,7 +298,7 @@ Status StorageEngine::_open() {
Status StorageEngine::_init_store_map() {
std::vector<std::thread> threads;
- SpinLock error_msg_lock;
+ std::mutex error_msg_lock;
std::string error_msg;
for (auto& path : _options.store_paths) {
auto store = std::make_unique<DataDir>(*this, path.path,
path.capacity_bytes,
@@ -309,7 +307,7 @@ Status StorageEngine::_init_store_map() {
auto st = store->init();
if (!st.ok()) {
{
- std::lock_guard<SpinLock> l(error_msg_lock);
+ std::lock_guard<std::mutex> l(error_msg_lock);
error_msg.append(st.to_string() + ";");
}
LOG(WARNING) << "Store load failed, status=" << st.to_string()
diff --git a/be/src/pipeline/exec/data_queue.h
b/be/src/pipeline/exec/data_queue.h
index f5bd84cc278..7dc4dcd04d0 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -25,7 +25,6 @@
#include <vector>
#include "common/status.h"
-#include "util/spinlock.h"
#include "vec/core/block.h"
namespace doris::pipeline {
@@ -105,7 +104,7 @@ private:
// data queue is multi sink one source
std::shared_ptr<Dependency> _source_dependency = nullptr;
std::vector<Dependency*> _sink_dependencies;
- SpinLock _source_lock;
+ std::mutex _source_lock;
};
} // namespace doris::pipeline
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 6dfd5d46eb6..54665d8184f 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -26,7 +26,6 @@
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
-#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
@@ -271,7 +270,7 @@ void
LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
ThriftSerializer ser(false, 4096);
uint8_t* buf = nullptr;
uint32_t len = 0;
- std::lock_guard<SpinLock> l(_profile_serialize_lock);
+ std::lock_guard<std::mutex> l(_profile_serialize_lock);
_profile->to_thrift(&tprofile);
auto st = ser.serialize(&tprofile, &len, &buf);
if (st.ok()) {
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 2889bcf2565..8b074245d46 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -30,7 +30,6 @@
#include "common/status.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
-#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
@@ -87,7 +86,7 @@ private:
UniqueId _load_id;
int64_t _txn_id = 0;
- SpinLock _profile_serialize_lock;
+ std::mutex _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index 8815b0f0e3e..a23882f017c 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -35,7 +35,6 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
-#include "util/spinlock.h"
#include "util/uid_util.h"
namespace doris {
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index 0a19dfd46f0..530f9a198dc 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -21,13 +21,8 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <stdint.h>
-#include <map>
+#include <atomic>
#include <memory>
-#include <mutex>
-#include <unordered_map>
-#include <utility>
-
-#include "util/spinlock.h"
namespace doris {
diff --git a/be/src/runtime/record_batch_queue.cpp
b/be/src/runtime/record_batch_queue.cpp
index 25db550db3a..042c30602c5 100644
--- a/be/src/runtime/record_batch_queue.cpp
+++ b/be/src/runtime/record_batch_queue.cpp
@@ -18,7 +18,6 @@
#include "runtime/record_batch_queue.h"
#include "pipeline/dependency.h"
-#include "util/spinlock.h"
namespace doris {
@@ -41,7 +40,7 @@ void RecordBatchQueue::update_status(const Status& status) {
return;
}
{
- std::lock_guard<SpinLock> l(_status_lock);
+ std::lock_guard<std::mutex> l(_status_lock);
if (_status.ok()) {
_status = status;
}
diff --git a/be/src/runtime/record_batch_queue.h
b/be/src/runtime/record_batch_queue.h
index a8e8c80c91c..c61243237eb 100644
--- a/be/src/runtime/record_batch_queue.h
+++ b/be/src/runtime/record_batch_queue.h
@@ -18,7 +18,6 @@
#pragma once
#include <sys/types.h>
-#include <util/spinlock.h>
#include <memory>
#include <mutex>
@@ -46,7 +45,7 @@ public:
RecordBatchQueue(u_int32_t max_elements) : _queue(max_elements) {}
Status status() {
- std::lock_guard<SpinLock> l(_status_lock);
+ std::lock_guard<std::mutex> l(_status_lock);
return _status;
}
@@ -66,7 +65,7 @@ public:
private:
BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
- SpinLock _status_lock;
+ std::mutex _status_lock;
Status _status;
std::shared_ptr<pipeline::Dependency> _dep = nullptr;
};
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 4d458cd440f..0e3b2c33e8a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -240,7 +240,7 @@ Status BaseTabletsChannel::incremental_open(const
PTabletWriterOpenRequest& para
auto delta_writer = create_delta_writer(wrequest);
{
// here we modify _tablet_writers. so need lock.
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
_tablet_writers.emplace(tablet.tablet_id(),
std::move(delta_writer));
}
@@ -444,7 +444,7 @@ void BaseTabletsChannel::refresh_profile() {
int64_t max_tablet_write_mem_usage = 0;
int64_t max_tablet_flush_mem_usage = 0;
{
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
for (auto&& [tablet_id, writer] : _tablet_writers) {
int64_t write_mem =
writer->mem_consumption(MemType::WRITE_FINISHED);
write_mem_usage += write_mem;
@@ -520,7 +520,7 @@ Status BaseTabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& req
auto delta_writer = create_delta_writer(wrequest);
{
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
_tablet_writers.emplace(tablet.tablet_id(),
std::move(delta_writer));
}
}
@@ -584,7 +584,7 @@ Status BaseTabletsChannel::_write_block_data(
// so need to protect it with _tablet_writers_lock.
decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
{
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError("unknown tablet to append data,
tablet={}", tablet_id);
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 87fbf9d06aa..4443737248f 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -32,7 +32,6 @@
#include "common/status.h"
#include "util/bitmap.h"
#include "util/runtime_profile.h"
-#include "util/spinlock.h"
#include "util/uid_util.h"
namespace google::protobuf {
@@ -173,7 +172,7 @@ protected:
// tablet_id -> TabletChannel. it will only be changed in open() or
inc_open()
std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>>
_tablet_writers;
// protect _tablet_writers
- SpinLock _tablet_writers_lock;
+ std::mutex _tablet_writers_lock;
// broken tablet ids.
// If a tablet write fails, it's id will be added to this set.
// So that following batch will not handle this tablet anymore.
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index a5f354180b8..e058b76ad33 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -42,7 +42,6 @@
#include "runtime/exec_env.h"
#include "util/dynamic_util.h"
#include "util/md5.h"
-#include "util/spinlock.h"
#include "util/string_util.h"
namespace doris {
diff --git a/be/src/util/lru_multi_cache.h b/be/src/util/lru_multi_cache.h
index 8c810a06ee2..ccf4e86f323 100644
--- a/be/src/util/lru_multi_cache.h
+++ b/be/src/util/lru_multi_cache.h
@@ -22,16 +22,10 @@
#pragma once
#include <boost/intrusive/list.hpp>
-#include <functional>
#include <list>
-#include <memory>
#include <mutex>
-#include <tuple>
#include <unordered_map>
-#include "gutil/macros.h"
-#include "util/spinlock.h"
-
namespace doris {
/// LruMultiCache is a threadsafe Least Recently Used Cache built on
std::unordered_map
@@ -99,7 +93,8 @@ public:
LruMultiCache(LruMultiCache&&) = delete;
LruMultiCache& operator=(LruMultiCache&&) = delete;
- DISALLOW_COPY_AND_ASSIGN(LruMultiCache);
+ LruMultiCache(const LruMultiCache&) = delete;
+ const LruMultiCache& operator=(const LruMultiCache&) = delete;
/// Returns the number of stored objects in O(1) time
size_t size();
@@ -130,13 +125,12 @@ public:
private:
/// Doubly linked list and auto_unlink is used for O(1) remove from LRU
list, in case of
/// get and evict.
- typedef boost::intrusive::list_member_hook<
- boost::intrusive::link_mode<boost::intrusive::auto_unlink>>
- link_type;
+ using link_type = boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
/// Internal type storing everything needed for O(1) operations
struct ValueType_internal {
- typedef std::list<ValueType_internal> Container_internal;
+ using Container_internal = std::list<ValueType_internal>;
/// Variadic template is used to support emplace
template <typename... Args>
@@ -171,19 +165,17 @@ private:
};
/// Owning list typedef
- typedef std::list<ValueType_internal> Container;
+ using Container = std::list<ValueType_internal>;
/// Hash table typedef
- typedef std::unordered_map<KeyType, Container> HashTableType;
+ using HashTableType = std::unordered_map<KeyType, Container>;
- typedef boost::intrusive::member_hook<ValueType_internal, link_type,
- &ValueType_internal::member_hook>
- MemberHookOption;
+ using MemberHookOption = boost::intrusive::member_hook<ValueType_internal,
link_type,
+
&ValueType_internal::member_hook>;
/// No constant time size to support self unlink, cache size is tracked by
the class
- typedef boost::intrusive::list<ValueType_internal, MemberHookOption,
- boost::intrusive::constant_time_size<false>>
- LruListType;
+ using LruListType = boost::intrusive::list<ValueType_internal,
MemberHookOption,
+
boost::intrusive::constant_time_size<false>>;
void release(ValueType_internal* p_value_internal);
void destroy(ValueType_internal* p_value_internal);
@@ -201,9 +193,8 @@ private:
size_t _capacity;
size_t _size;
- /// Protects access to cache. No need for read/write cache as there is no
costly
- /// pure read operation
- SpinLock _lock;
+ /// Protects access to cache. No need for read/write cache as there is no
costly pure read operation
+ std::mutex _lock;
public:
/// RAII Accessor to give unqiue access for a cached object
@@ -216,7 +207,8 @@ public:
Accessor(Accessor&&);
Accessor& operator=(Accessor&&);
- DISALLOW_COPY_AND_ASSIGN(Accessor);
+ Accessor(const Accessor&) = delete;
+ const Accessor& operator=(const Accessor&) = delete;
/// Automatic release in destructor
~Accessor();
diff --git a/be/src/util/lru_multi_cache.inline.h
b/be/src/util/lru_multi_cache.inline.h
index 87d09891342..87a910577b4 100644
--- a/be/src/util/lru_multi_cache.inline.h
+++ b/be/src/util/lru_multi_cache.inline.h
@@ -110,25 +110,25 @@ LruMultiCache<KeyType, ValueType>::LruMultiCache(size_t
capacity) : _capacity(ca
template <typename KeyType, typename ValueType>
size_t LruMultiCache<KeyType, ValueType>::size() {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
return _size;
}
template <typename KeyType, typename ValueType>
size_t LruMultiCache<KeyType, ValueType>::number_of_keys() {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
return _hash_table.size();
}
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::set_capacity(size_t new_capacity) {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
_capacity = new_capacity;
}
template <typename KeyType, typename ValueType>
auto LruMultiCache<KeyType, ValueType>::get(const KeyType& key) -> Accessor {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
auto hash_table_it = _hash_table.find(key);
// No owning list found with this key, the caller will have to create a
new object
@@ -160,7 +160,7 @@ template <typename KeyType, typename ValueType>
template <typename... Args>
auto LruMultiCache<KeyType, ValueType>::emplace_and_get(const KeyType& key,
Args&&... args)
-> Accessor {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
// creates default container if there isn't one
Container& container = _hash_table[key];
@@ -186,7 +186,7 @@ auto LruMultiCache<KeyType,
ValueType>::emplace_and_get(const KeyType& key, Args
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::release(ValueType_internal*
p_value_internal) {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
// This only can be used by the accessor, which already checks for nullptr
DCHECK(p_value_internal);
@@ -211,7 +211,7 @@ void LruMultiCache<KeyType,
ValueType>::release(ValueType_internal* p_value_inte
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::destroy(ValueType_internal*
p_value_internal) {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
// This only can be used by the accessor, which already checks for nullptr
DCHECK(p_value_internal);
@@ -234,19 +234,19 @@ void LruMultiCache<KeyType,
ValueType>::destroy(ValueType_internal* p_value_inte
template <typename KeyType, typename ValueType>
size_t LruMultiCache<KeyType, ValueType>::number_of_available_objects() {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
return _lru_list.size();
}
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::rehash() {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
_hash_table.rehash(_hash_table.bucket_count() + 1);
}
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::_evict_one(ValueType_internal&
value_internal) {
- // SpinLock is locked by the caller evicting function
+ // std::mutex is locked by the caller evicting function
// _lock.DCheckLocked();
// Has to be available to evict
@@ -270,7 +270,7 @@ void LruMultiCache<KeyType,
ValueType>::_evict_one(ValueType_internal& value_int
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::_evict_one_if_needed() {
- // SpinLock is locked by the caller public function
+ // std::mutex is locked by the caller public function
// _lock.DCheckLocked();
if (!_lru_list.empty() && _size > _capacity) {
@@ -280,7 +280,7 @@ void LruMultiCache<KeyType,
ValueType>::_evict_one_if_needed() {
template <typename KeyType, typename ValueType>
void LruMultiCache<KeyType, ValueType>::evict_older_than(uint64_t
oldest_allowed_timestamp) {
- std::lock_guard<SpinLock> g(_lock);
+ std::lock_guard<std::mutex> g(_lock);
// Stop eviction if
// - there are no more available (i.e. evictable) objects
diff --git a/be/src/util/spinlock.h b/be/src/util/spinlock.h
deleted file mode 100644
index c0712875fe6..00000000000
--- a/be/src/util/spinlock.h
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/spinlock.h
-// and modified by Doris
-
-#pragma once
-
-#include <sched.h> /* For sched_yield() */
-
-#include <atomic>
-
-namespace doris {
-
-// Lightweight spinlock.
-class SpinLock {
-public:
- SpinLock() : _locked(false) {
- // do nothing
- }
-
- // Acquires the lock, spins until the lock becomes available
- void lock() {
- for (int spin_count = 0; !try_lock(); ++spin_count) {
- if (spin_count < NUM_SPIN_CYCLES) {
-#if (defined(__i386) || defined(__x86_64__))
- asm volatile("pause\n" : : : "memory");
-#elif defined(__aarch64__)
- asm volatile("yield\n" ::: "memory");
-#endif
- } else {
- sched_yield();
- spin_count = 0;
- }
- }
- }
-
- void unlock() { _locked.clear(std::memory_order_release); }
-
- // Tries to acquire the lock
- bool try_lock() { return !_locked.test_and_set(std::memory_order_acquire);
}
-
-private:
- static const int NUM_SPIN_CYCLES = 70;
- std::atomic_flag _locked;
-};
-
-} // end namespace doris
diff --git a/be/src/util/trace.h b/be/src/util/trace.h
index 7a876fd3c3c..f8ebc394b22 100644
--- a/be/src/util/trace.h
+++ b/be/src/util/trace.h
@@ -22,7 +22,6 @@
#include "gutil/strings/substitute.h"
#include "gutil/threading/thread_collision_warner.h"
#include "util/scoped_cleanup.h"
-#include "util/spinlock.h"
#include "util/time.h"
// If this scope times out, make a simple trace.
diff --git a/be/src/util/uuid_generator.h b/be/src/util/uuid_generator.h
index 0a78ca9b8cc..990a62d30e1 100644
--- a/be/src/util/uuid_generator.h
+++ b/be/src/util/uuid_generator.h
@@ -21,17 +21,13 @@
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <mutex>
-#include <ostream>
-#include <string>
-
-#include "util/spinlock.h"
namespace doris {
class UUIDGenerator {
public:
boost::uuids::uuid next_uuid() {
- std::lock_guard<SpinLock> lock(_uuid_gen_lock);
+ std::lock_guard<std::mutex> lock(_uuid_gen_lock);
return _boost_uuid_generator();
}
@@ -42,7 +38,7 @@ public:
private:
boost::uuids::basic_random_generator<boost::mt19937> _boost_uuid_generator;
- SpinLock _uuid_gen_lock;
+ std::mutex _uuid_gen_lock;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index a10d5e46a26..b8bebeb9acf 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -166,7 +166,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel*
node_channel, const std::s
}
{
- std::lock_guard<doris::SpinLock> l(_fail_lock);
+ std::lock_guard<std::mutex> l(_fail_lock);
if (tablet_id == -1) {
for (const auto the_tablet_id : it->second) {
_failed_channels[the_tablet_id].insert(node_id);
@@ -189,7 +189,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel*
node_channel, const std::s
}
Status IndexChannel::check_intolerable_failure() {
- std::lock_guard<doris::SpinLock> l(_fail_lock);
+ std::lock_guard<std::mutex> l(_fail_lock);
return _intolerable_failure_status;
}
@@ -197,7 +197,7 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState*
state) {
std::vector<TErrorTabletInfo> error_tablet_infos;
{
- std::lock_guard<doris::SpinLock> l(_fail_lock);
+ std::lock_guard<std::mutex> l(_fail_lock);
for (const auto& it : _failed_channels_msgs) {
TErrorTabletInfo error_info;
error_info.__set_tabletId(it.first);
@@ -521,7 +521,7 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
if (_cancelled) {
- std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+ std::lock_guard<std::mutex> l(_cancel_msg_lock);
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("add row
failed. {}",
_cancel_msg);
} else {
@@ -620,7 +620,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState*
state,
void VNodeChannel::_cancel_with_msg(const std::string& msg) {
LOG(WARNING) << "cancel node channel " << channel_info() << ", error
message: " << msg;
{
- std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+ std::lock_guard<std::mutex> l(_cancel_msg_lock);
if (_cancel_msg.empty()) {
_cancel_msg = msg;
}
@@ -945,7 +945,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
if (_cancelled) {
- std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+ std::lock_guard<std::mutex> l(_cancel_msg_lock);
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("wait close
failed. {}",
_cancel_msg);
} else {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index a17fb19c0ee..1dbeb6722ce 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -57,7 +57,6 @@
#include "runtime/thread_context.h"
#include "util/ref_count_closure.h"
#include "util/runtime_profile.h"
-#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/core/block.h"
@@ -264,7 +263,7 @@ public:
bool is_closed() const { return _is_closed; }
bool is_cancelled() const { return _cancelled; }
std::string get_cancel_msg() {
- std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
+ std::lock_guard<std::mutex> l(_cancel_msg_lock);
if (!_cancel_msg.empty()) {
return _cancel_msg;
}
@@ -342,7 +341,7 @@ protected:
// user cancel or get some errors
std::atomic<bool> _cancelled {false};
- doris::SpinLock _cancel_msg_lock;
+ std::mutex _cancel_msg_lock;
std::string _cancel_msg;
// send finished means the consumer thread which send the rpc can exit
@@ -516,7 +515,7 @@ private:
bool _has_inc_node = false;
// lock to protect _failed_channels and _failed_channels_msgs
- mutable doris::SpinLock _fail_lock;
+ mutable std::mutex _fail_lock;
// key is tablet_id, value is a set of failed node id
std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
// key is tablet_id, value is error message
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index 3859639539d..bc7fcf48bdc 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -48,7 +48,6 @@
#include "util/countdown_latch.h"
#include "util/random.h"
#include "util/scoped_cleanup.h"
-#include "util/spinlock.h"
#include "util/time.h"
using std::atomic;
@@ -652,11 +651,11 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
Random rng(seed);
// Protects 'tokens' and 'rng'.
- SpinLock lock;
+ std::mutex lock;
// Fetch a token from 'tokens' at random.
auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
- std::lock_guard<SpinLock> l(lock);
+ std::lock_guard<std::mutex> l(lock);
int idx = rng.Uniform(kNumTokens);
return tokens[idx];
};
@@ -665,7 +664,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
for (int i = 0; i < kNumTokens; i++) {
ThreadPool::ExecutionMode mode;
{
- std::lock_guard<SpinLock> l(lock);
+ std::lock_guard<std::mutex> l(lock);
mode = rng.Next() % 2 ? ThreadPool::ExecutionMode::SERIAL
: ThreadPool::ExecutionMode::CONCURRENT;
}
@@ -689,7 +688,7 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
int num_tokens_cycled = 0;
while (latch.count()) {
{
- std::lock_guard<SpinLock> l(lock);
+ std::lock_guard<std::mutex> l(lock);
int idx = rng.Uniform(kNumTokens);
ThreadPool::ExecutionMode mode =
rng.Next() % 2 ? ThreadPool::ExecutionMode::SERIAL
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index ccad0b1acb4..2783dc51e39 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -49,7 +49,6 @@
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "util/slice.h"
-#include "util/spinlock.h"
#include "util/timezone_utils.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]