This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9d3ee27e557 branch-2.1: [fix](restore) Add a local snapshot lock to
protect snapshot dir #47279 (#47574)
9d3ee27e557 is described below
commit 9d3ee27e557d3134056f94d57bd0d38d3e2f4ae9
Author: walter <[email protected]>
AuthorDate: Sat Feb 8 09:49:22 2025 +0800
branch-2.1: [fix](restore) Add a local snapshot lock to protect snapshot
dir #47279 (#47574)
cherry pick from #47279
---
be/src/olap/snapshot_manager.cpp | 33 +++++++++++-
be/src/olap/snapshot_manager.h | 50 +++++++++++++++++++
be/src/runtime/snapshot_loader.cpp | 100 ++++++++++++++++++-------------------
3 files changed, 131 insertions(+), 52 deletions(-)
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 56274b9a939..7a2119d6273 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -68,6 +68,35 @@ using namespace ErrorCode;
SnapshotManager* SnapshotManager::_s_instance = nullptr;
std::mutex SnapshotManager::_mlock;
+LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) {
+ std::unique_lock<std::mutex> l(_lock);
+ auto& ctx = _local_snapshot_contexts[path];
+ while (ctx._is_locked) {
+ ctx._waiting_count++;
+ ctx._cv.wait(l);
+ ctx._waiting_count--;
+ }
+
+ ctx._is_locked = true;
+ return {path};
+}
+
+void LocalSnapshotLock::release(const std::string& path) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto iter = _local_snapshot_contexts.find(path);
+ if (iter == _local_snapshot_contexts.end()) {
+ return;
+ }
+
+ auto& ctx = iter->second;
+ ctx._is_locked = false;
+ if (ctx._waiting_count > 0) {
+ ctx._cv.notify_one();
+ } else {
+ _local_snapshot_contexts.erase(iter);
+ }
+}
+
SnapshotManager* SnapshotManager::instance() {
if (_s_instance == nullptr) {
std::lock_guard<std::mutex> lock(_mlock);
@@ -124,6 +153,8 @@ Status SnapshotManager::make_snapshot(const
TSnapshotRequest& request, string* s
}
Status SnapshotManager::release_snapshot(const string& snapshot_path) {
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(snapshot_path);
+
// If the requested snapshot_path is located in the root/snapshot folder,
it is considered legal and can be deleted.
// Otherwise, it is considered an illegal request and returns an error
result.
SCOPED_ATTACH_TASK(_mem_tracker);
@@ -470,7 +501,7 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
}
}
// be would definitely set it as true no matter has missed version
or not
- // but it would take no effets on the following range loop
+ // but it would take no effects on the following range loop
if (!is_single_rowset_clone && request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index e32409dd3cd..6eaec3b1fdf 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
@@ -36,6 +37,55 @@ class RowsetMetaPB;
class TSnapshotRequest;
struct RowsetId;
+class LocalSnapshotLockGuard;
+
+// A simple lock to protect the local snapshot path.
+class LocalSnapshotLock {
+ friend class LocalSnapshotLockGuard;
+
+public:
+ LocalSnapshotLock() = default;
+ ~LocalSnapshotLock() = default;
+ LocalSnapshotLock(const LocalSnapshotLock&) = delete;
+ LocalSnapshotLock& operator=(const LocalSnapshotLock&) = delete;
+
+ static LocalSnapshotLock& instance() {
+ static LocalSnapshotLock instance;
+ return instance;
+ }
+
+ // Acquire the lock for the specified path. It will block if the lock is
already held by another.
+ LocalSnapshotLockGuard acquire(const std::string& path);
+
+private:
+ void release(const std::string& path);
+
+ class LocalSnapshotContext {
+ public:
+ bool _is_locked = false;
+ size_t _waiting_count = 0;
+ std::condition_variable _cv;
+
+ LocalSnapshotContext() = default;
+ LocalSnapshotContext(const LocalSnapshotContext&) = delete;
+ LocalSnapshotContext& operator=(const LocalSnapshotContext&) = delete;
+ };
+
+ std::mutex _lock;
+ std::unordered_map<std::string, LocalSnapshotContext>
_local_snapshot_contexts;
+};
+
+class LocalSnapshotLockGuard {
+public:
+ LocalSnapshotLockGuard(std::string path) : _snapshot_path(std::move(path))
{}
+ LocalSnapshotLockGuard(const LocalSnapshotLockGuard&) = delete;
+ LocalSnapshotLockGuard& operator=(const LocalSnapshotLockGuard&) = delete;
+ ~LocalSnapshotLockGuard() {
LocalSnapshotLock::instance().release(_snapshot_path); }
+
+private:
+ std::string _snapshot_path;
+};
+
class SnapshotManager {
public:
~SnapshotManager() {}
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index 1764e3d4322..2a6ba9274e2 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -31,6 +31,7 @@
#include <unistd.h>
#include <algorithm>
+#include <condition_variable>
#include <cstring>
#include <filesystem>
#include <istream>
@@ -150,6 +151,9 @@ Status SnapshotLoader::upload(const std::map<std::string,
std::string>& src_to_d
const std::string& src_path = iter->first;
const std::string& dest_path = iter->second;
+ // Take a lock to protect the local snapshot path.
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(src_path);
+
int64_t tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(
@@ -247,6 +251,9 @@ Status SnapshotLoader::download(const std::map<std::string,
std::string>& src_to
const std::string& remote_path = iter->first;
const std::string& local_path = iter->second;
+ // Take a lock to protect the local snapshot path.
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(local_path);
+
int64_t local_tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path,
&local_tablet_id,
@@ -403,8 +410,6 @@ Status SnapshotLoader::download(const std::map<std::string,
std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
- LOG(INFO) << fmt::format("begin to download snapshots via http. job: {},
task id: {}", _job_id,
- _task_id);
constexpr uint32_t kListRemoteFileTimeout = 15;
constexpr uint32_t kDownloadFileMaxRetry = 3;
constexpr uint32_t kGetLengthTimeout = 10;
@@ -414,35 +419,39 @@ Status SnapshotLoader::remote_http_download(
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
Status status = Status::OK();
- // Step before, validate all remote
-
- // Step 1: Validate local tablet snapshot paths
+ int report_counter = 0;
+ int finished_num = 0;
+ int total_num = remote_tablet_snapshots.size();
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
- const auto& path = remote_tablet_snapshot.local_snapshot_path;
+ const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+ const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+ LOG(INFO) << fmt::format(
+ "download snapshots via http. job: {}, task id: {}, local dir:
{}, remote dir: {}",
+ _job_id, _task_id, local_path, remote_path);
+
+ // Take a lock to protect the local snapshot path.
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(local_path);
+
+ // Step 1: Validate local tablet snapshot paths
bool res = true;
- RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path,
&res));
+
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
if (!res) {
std::stringstream ss;
auto err_msg =
- fmt::format("snapshot path is not directory or does not
exist: {}", path);
+ fmt::format("snapshot path is not directory or does not
exist: {}", local_path);
LOG(WARNING) << err_msg;
return Status::RuntimeError(err_msg);
}
- }
- // Step 2: get all local files
- struct LocalFileStat {
- uint64_t size;
- std::string md5;
- };
- std::unordered_map<std::string, std::unordered_map<std::string,
LocalFileStat>> local_files_map;
- for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
- const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
- std::vector<std::string> local_files;
- RETURN_IF_ERROR(_get_existing_files_from_local(local_path,
&local_files));
-
- auto& local_filestat = local_files_map[local_path];
- for (auto& local_file : local_files) {
+ // Step 2: get all local files
+ struct LocalFileStat {
+ uint64_t size;
+ std::string md5;
+ };
+ std::unordered_map<std::string, LocalFileStat> local_files;
+ std::vector<std::string> existing_files;
+ RETURN_IF_ERROR(_get_existing_files_from_local(local_path,
&existing_files));
+ for (auto& local_file : existing_files) {
// add file size
std::string local_file_path = local_path + "/" + local_file;
std::error_code ec;
@@ -459,27 +468,20 @@ Status SnapshotLoader::remote_http_download(
<< " md5sum: " << status.to_string();
return status;
}
- local_filestat[local_file] = {local_file_size, md5};
+ local_files[local_file] = {local_file_size, md5};
}
- }
-
- // Step 3: Validate remote tablet snapshot paths && remote files map
- // key is remote snapshot paths, value is filelist
- // get all these use http download action
- //
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
- int report_counter = 0;
- int total_num = remote_tablet_snapshots.size();
- int finished_num = 0;
- struct RemoteFileStat {
- std::string url;
- std::string md5;
- uint64_t size;
- };
- std::unordered_map<std::string, std::unordered_map<std::string,
RemoteFileStat>>
- remote_files_map;
- for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
- const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
- auto& remote_files = remote_files_map[remote_path];
+ existing_files.clear();
+
+ // Step 3: Validate remote tablet snapshot paths && remote files map
+ // key is remote snapshot paths, value is filelist
+ // get all these use http download action
+ //
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+ struct RemoteFileStat {
+ std::string url;
+ std::string md5;
+ uint64_t size;
+ };
+ std::unordered_map<std::string, RemoteFileStat> remote_files;
const auto& token = remote_tablet_snapshot.remote_token;
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
@@ -520,19 +522,11 @@ Status SnapshotLoader::remote_http_download(
remote_files[filename] = RemoteFileStat {remote_file_url,
file_md5, file_size};
}
- }
- // Step 4: Compare local and remote files && get all need download files
- for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ // Step 4: Compare local and remote files && get all need download
files
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
TTaskType::type::DOWNLOAD));
- const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
- const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
- auto& remote_files = remote_files_map[remote_path];
- auto& local_files = local_files_map[local_path];
- auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
-
// get all need download files
std::vector<std::string> need_download_files;
for (const auto& [remote_file, remote_filestat] : remote_files) {
@@ -661,6 +655,7 @@ Status SnapshotLoader::remote_http_download(
if (total_time_ms > 0) {
copy_rate = total_file_size / ((double)total_time_ms) / 1000;
}
+ auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
LOG(INFO) << fmt::format(
"succeed to copy remote tablet {} to local tablet {}, total
file size: {} B, cost: "
"{} ms, rate: {} MB/s",
@@ -710,6 +705,9 @@ Status SnapshotLoader::remote_http_download(
// MUST hold tablet's header lock, push lock, cumulative lock and base
compaction lock
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr
tablet,
bool overwrite) {
+ // Take a lock to protect the local snapshot path.
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(snapshot_path);
+
auto tablet_path = tablet->tablet_path();
auto store_path = tablet->data_dir()->path();
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ",
to: " << tablet_path
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]