This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7482b6bad2 [fix](cooldown) Add cold_compaction_lock to serialize any
operations which may delete the input rowsets of cold data compaction (#16742)
7482b6bad2 is described below
commit 7482b6bad2c368bda072ecca9dac0ad34ca9ba78
Author: plat1ko <[email protected]>
AuthorDate: Tue Feb 14 21:38:33 2023 +0800
[fix](cooldown) Add cold_compaction_lock to serialize any operations which
may delete the input rowsets of cold data compaction (#16742)
Add cold_compaction_lock to serialize tablet clone, cold data compaction
and follow cooldowned data
---
be/src/olap/olap_server.cpp | 5 +++++
be/src/olap/tablet.cpp | 9 ++++++---
be/src/olap/tablet.h | 5 ++++-
be/src/olap/task/engine_clone_task.cpp | 16 ++++++++--------
4 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 707b77db5b..409abffc47 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -833,6 +833,11 @@ void
StorageEngine::_cold_data_compaction_producer_callback() {
std::lock_guard lock(tablet_submitted_mtx);
tablet_submitted.insert(t->tablet_id());
}
+ std::unique_lock
cold_compaction_lock(t->get_cold_compaction_lock(),
+ std::try_to_lock);
+ if (!cold_compaction_lock.owns_lock()) {
+ LOG(WARNING) << "try cold_compaction_lock failed,
tablet_id=" << t->tablet_id();
+ }
auto st = compaction->compact();
{
std::lock_guard lock(tablet_submitted_mtx);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a931b6c81b..f7b36a2d40 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1773,9 +1773,6 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
- std::vector to_add {std::move(new_rowset)};
- std::vector to_delete {std::move(old_rowset)};
-
{
std::unique_lock meta_wlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
@@ -1849,6 +1846,12 @@ Status
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
<< " cooldown_replica_id=" << cooldown_replica_id
<< " local replica=" << replica_id();
+ // MUST executing serially with cold data compaction, because compaction
input rowsets may be deleted by this function
+ std::unique_lock cold_compaction_lock(_cold_compaction_lock,
std::try_to_lock);
+ if (!cold_compaction_lock.owns_lock()) {
+ return Status::Error<TRY_LOCK_FAILED>("try cold_compaction_lock
failed");
+ }
+
TabletMetaPB cooldown_meta_pb;
RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id,
&cooldown_meta_pb));
DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f06463dcb3..f40450b947 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -331,6 +331,8 @@ public:
std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; }
uint32_t calc_cold_data_compaction_score() const;
+
+ std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
@@ -505,10 +507,11 @@ private:
bool _skip_base_compaction = false;
int64_t _skip_base_compaction_ts;
- // cooldown conf
+ // cooldown related
int64_t _cooldown_replica_id = -1;
int64_t _cooldown_term = -1;
std::shared_mutex _remote_files_lock;
+ std::mutex _cold_compaction_lock;
DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index d3f59653b7..923c3563c2 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -471,14 +471,6 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet,
const std::string& clone_d
int64_t committed_version, bool
is_incremental_clone) {
Defer remove_clone_dir {[&]() { std::filesystem::remove_all(clone_dir); }};
- // clone and compaction operation should be performed sequentially
- std::lock_guard<std::mutex>
base_compaction_lock(tablet->get_base_compaction_lock());
- std::lock_guard<std::mutex> cumulative_compaction_lock(
- tablet->get_cumulative_compaction_lock());
- tablet->set_clone_occurred(true);
- std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
- std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
- std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
// check clone dir existed
if (!FileUtils::check_exist(clone_dir)) {
return Status::InternalError("clone dir not existed. clone_dir={}",
clone_dir);
@@ -528,6 +520,14 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet,
const std::string& clone_d
linked_success_files.emplace_back(std::move(to));
}
+ // clone and compaction operation should be performed sequentially
+ std::lock_guard base_compaction_lock(tablet->get_base_compaction_lock());
+ std::lock_guard
cumulative_compaction_lock(tablet->get_cumulative_compaction_lock());
+ std::lock_guard cold_compaction_lock(tablet->get_cold_compaction_lock());
+ tablet->set_clone_occurred(true);
+ std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
+ std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
+ std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
if (is_incremental_clone) {
status = _finish_incremental_clone(tablet, cloned_tablet_meta,
committed_version);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]