This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4b9d6f9695b branch-3.0: [fix](cloud) Persist cluster_id file in
Compute-Storage Decoupled mode #53147 (#53195)
4b9d6f9695b is described below
commit 4b9d6f9695b1a8beea862c0aa6643d35bfdac3ec
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jul 14 21:39:57 2025 +0800
branch-3.0: [fix](cloud) Persist cluster_id file in Compute-Storage
Decoupled mode #53147 (#53195)
Cherry-picked from #53147
Co-authored-by: yagagagaga <[email protected]>
---
be/src/cloud/cloud_storage_engine.cpp | 77 +++++++++++++++++++++++++++++++++--
be/src/cloud/cloud_storage_engine.h | 12 +++---
be/src/runtime/exec_env_init.cpp | 2 +-
3 files changed, 82 insertions(+), 9 deletions(-)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 74a8f5068a4..6cac3c7f8d0 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -89,10 +89,11 @@ int get_base_thread_num() {
return std::min(std::max(int(num_cores *
config::base_compaction_thread_num_factor), 1), 10);
}
-CloudStorageEngine::CloudStorageEngine(const UniqueId& backend_uid)
- : BaseStorageEngine(Type::CLOUD, backend_uid),
+CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
+ : BaseStorageEngine(Type::CLOUD, options.backend_uid),
_meta_mgr(std::make_unique<cloud::CloudMetaMgr>()),
- _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)) {
+ _tablet_mgr(std::make_unique<CloudTabletMgr>(*this)),
+ _options(options) {
_cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
_cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
@@ -226,6 +227,9 @@ Status CloudStorageEngine::open() {
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");
+ // check cluster id
+ RETURN_NOT_OK_STATUS_WITH_WARN(_check_all_root_path_cluster_id(), "fail to
check cluster id");
+
return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
@@ -1141,5 +1145,72 @@ Status
CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tabl
return Status::OK();
}
+Status CloudStorageEngine::_check_all_root_path_cluster_id() {
+ // Check if all root paths have the same cluster id
+ std::set<int32_t> cluster_ids;
+ for (const auto& path : _options.store_paths) {
+ auto cluster_id_path = fmt::format("{}/{}", path.path,
CLUSTER_ID_PREFIX);
+ bool exists = false;
+ RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path,
&exists));
+ if (exists) {
+ io::FileReaderSPtr reader;
+
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cluster_id_path,
&reader));
+ size_t fsize = reader->size();
+ if (fsize > 0) {
+ std::string content;
+ content.resize(fsize, '\0');
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(reader->read_at(0, {content.data(), fsize},
&bytes_read));
+ DCHECK_EQ(fsize, bytes_read);
+ int32_t tmp_cluster_id = std::stoi(content);
+ cluster_ids.insert(tmp_cluster_id);
+ }
+ }
+ }
+ _effective_cluster_id = config::cluster_id;
+ // first init
+ if (cluster_ids.empty()) {
+ // not set configured cluster id
+ if (_effective_cluster_id == -1) {
+ return Status::OK();
+ } else {
+ // If no cluster id file exists, use the configured cluster id
+ RETURN_IF_ERROR(set_cluster_id(_effective_cluster_id));
+ }
+ }
+ if (cluster_ids.size() > 1) {
+ return Status::InternalError(
+ "All root paths must have the same cluster id, but you have "
+ "different cluster ids: {}",
+ fmt::join(cluster_ids, ", "));
+ }
+ if (_effective_cluster_id != -1 && *cluster_ids.begin() !=
_effective_cluster_id) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::Corruption("multiple cluster ids is not equal.
config::cluster_id={}, "
+ "storage path cluster_id={}",
+ _effective_cluster_id,
*cluster_ids.begin()),
+ "cluster id not equal");
+ }
+ return Status::OK();
+}
+
+Status CloudStorageEngine::set_cluster_id(int32_t cluster_id) {
+ std::lock_guard<std::mutex> l(_store_lock);
+ for (auto& path : _options.store_paths) {
+ auto cluster_id_path = fmt::format("{}/{}", path.path,
CLUSTER_ID_PREFIX);
+ bool exists = false;
+ RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path,
&exists));
+ if (!exists) {
+ io::FileWriterPtr file_writer;
+ RETURN_IF_ERROR(
+
io::global_local_filesystem()->create_file(cluster_id_path, &file_writer));
+ RETURN_IF_ERROR(file_writer->append(std::to_string(cluster_id)));
+ RETURN_IF_ERROR(file_writer->close());
+ }
+ }
+ _effective_cluster_id = cluster_id;
+ return Status::OK();
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index f21e443a77e..b7aa2da94e8 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+#include <mutex>
//#include "cloud/cloud_cumulative_compaction.h"
//#include "cloud/cloud_base_compaction.h"
@@ -51,7 +52,7 @@ class CloudCompactionStopToken;
class CloudStorageEngine final : public BaseStorageEngine {
public:
- CloudStorageEngine(const UniqueId& backend_uid);
+ CloudStorageEngine(const EngineOptions& options);
~CloudStorageEngine() override;
@@ -64,10 +65,7 @@ public:
Status start_bg_threads() override;
- Status set_cluster_id(int32_t cluster_id) override {
- _effective_cluster_id = cluster_id;
- return Status::OK();
- }
+ Status set_cluster_id(int32_t cluster_id) override;
cloud::CloudMetaMgr& meta_mgr() const { return *_meta_mgr; }
@@ -169,6 +167,7 @@ private:
Status _request_tablet_global_compaction_lock(ReaderType compaction_type,
const CloudTabletSPtr&
tablet,
std::shared_ptr<CloudCompactionMixin> compaction);
+ Status _check_all_root_path_cluster_id();
void _lease_compaction_thread_callback();
void _check_tablet_delete_bitmap_score_callback();
@@ -221,6 +220,9 @@ private:
CumuPolices _cumulative_compaction_policies;
std::atomic_bool first_sync_storage_vault {true};
+
+ EngineOptions _options;
+ std::mutex _store_lock;
};
} // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8f30211602a..1d1ad2ee184 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -327,7 +327,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
if (config::is_cloud_mode()) {
std::cout << "start BE in cloud mode, cloud_unique_id: " <<
config::cloud_unique_id
<< ", meta_service_endpoint: " <<
config::meta_service_endpoint << std::endl;
- _storage_engine =
std::make_unique<CloudStorageEngine>(options.backend_uid);
+ _storage_engine = std::make_unique<CloudStorageEngine>(options);
} else {
std::cout << "start BE in local mode" << std::endl;
_storage_engine = std::make_unique<StorageEngine>(options);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]