This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1e9bea28fd1 branch-3.0: [improve](restore) Link existing rowset files
with source rowset id #48435 (#48999)
1e9bea28fd1 is described below
commit 1e9bea28fd106ffc735abc5a02a69e87ebe8520e
Author: walter <[email protected]>
AuthorDate: Sat Mar 15 10:39:58 2025 +0800
branch-3.0: [improve](restore) Link existing rowset files with source
rowset id #48435 (#48999)
cherry pick from #48435
---
be/src/cloud/pb_convert.cpp | 8 +
be/src/olap/snapshot_manager.cpp | 8 +-
be/src/olap/tablet_meta.cpp | 12 +-
be/src/olap/tablet_meta.h | 1 +
be/src/runtime/snapshot_loader.cpp | 890 ++++++++++++++-------
be/src/runtime/snapshot_loader.h | 3 +
be/src/service/backend_service.cpp | 8 +-
be/src/util/stopwatch.hpp | 5 +-
gensrc/proto/olap_file.proto | 7 +
.../ccr_syncer_p1/test_backup_restore.groovy | 9 +-
10 files changed, 652 insertions(+), 299 deletions(-)
diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp
index 2c51e97dd57..ec483ba682c 100644
--- a/be/src/cloud/pb_convert.cpp
+++ b/be/src/cloud/pb_convert.cpp
@@ -85,6 +85,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const
RowsetMetaPB& in)
out->set_has_variant_type_in_schema(in.has_has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
+ out->set_source_rowset_id(in.source_rowset_id());
+ out->set_source_tablet_id(in.source_tablet_id());
}
void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
@@ -137,6 +139,8 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out,
RowsetMetaPB&& in) {
out->set_has_variant_type_in_schema(in.has_variant_type_in_schema());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
+ out->set_source_rowset_id(in.source_rowset_id());
+ out->set_source_tablet_id(in.source_tablet_id());
}
static void fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB*
out,
@@ -235,6 +239,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const
RowsetMetaCloudPB& in,
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->mutable_inverted_index_file_info()->CopyFrom(in.inverted_index_file_info());
+ out->set_source_rowset_id(in.source_rowset_id());
+ out->set_source_tablet_id(in.source_tablet_id());
}
void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in,
@@ -288,6 +294,8 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out,
RowsetMetaCloudPB&& in,
out->set_enable_segments_file_size(in.enable_segments_file_size());
out->set_enable_inverted_index_file_info(in.enable_inverted_index_file_info());
out->mutable_inverted_index_file_info()->Swap(in.mutable_inverted_index_file_info());
+ out->set_source_rowset_id(in.source_rowset_id());
+ out->set_source_tablet_id(in.source_tablet_id());
}
TabletSchemaCloudPB doris_tablet_schema_to_cloud(const TabletSchemaPB& in) {
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 7f0e94274d9..a59ed36bb82 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -184,10 +184,8 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
// load original tablet meta
auto cloned_meta_file = fmt::format("{}/{}.hdr", clone_dir, tablet_id);
- TabletMeta cloned_tablet_meta;
-
RETURN_IF_ERROR_RESULT(cloned_tablet_meta.create_from_file(cloned_meta_file));
TabletMetaPB cloned_tablet_meta_pb;
- cloned_tablet_meta.to_meta_pb(&cloned_tablet_meta_pb);
+ RETURN_IF_ERROR_RESULT(TabletMeta::load_from_file(cloned_meta_file,
&cloned_tablet_meta_pb));
TabletMetaPB new_tablet_meta_pb;
new_tablet_meta_pb = cloned_tablet_meta_pb;
@@ -230,6 +228,8 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
src_rs_id.init(visible_rowset.rowset_id_v2());
}
rowset_id_mapping[src_rs_id] = rowset_id;
+ rowset_meta->set_source_rowset_id(visible_rowset.rowset_id_v2());
+
rowset_meta->set_source_tablet_id(cloned_tablet_meta_pb.tablet_id());
} else {
// remote rowset
*rowset_meta = visible_rowset;
@@ -265,6 +265,8 @@ Result<std::vector<PendingRowsetGuard>>
SnapshotManager::convert_rowset_ids(
src_rs_id.init(stale_rowset.rowset_id_v2());
}
rowset_id_mapping[src_rs_id] = rowset_id;
+ rowset_meta->set_source_rowset_id(stale_rowset.rowset_id_v2());
+
rowset_meta->set_source_tablet_id(cloned_tablet_meta_pb.tablet_id());
} else {
// remote rowset
*rowset_meta = stale_rowset;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 50a62899a9d..9bbb99a44b6 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -453,18 +453,22 @@ void TabletMeta::init_column_from_tcolumn(uint32_t
unique_id, const TColumn& tco
}
Status TabletMeta::create_from_file(const string& file_path) {
+ TabletMetaPB tablet_meta_pb;
+ RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb));
+ init_from_pb(tablet_meta_pb);
+ return Status::OK();
+}
+
+Status TabletMeta::load_from_file(const string& file_path, TabletMetaPB*
tablet_meta_pb) {
FileHeader<TabletMetaPB> file_header(file_path);
// In file_header.deserialize(), it validates file length, signature,
checksum of protobuf.
RETURN_IF_ERROR(file_header.deserialize());
- TabletMetaPB tablet_meta_pb;
try {
- tablet_meta_pb.CopyFrom(file_header.message());
+ tablet_meta_pb->CopyFrom(file_header.message());
} catch (...) {
return Status::Error<PARSE_PROTOBUF_ERROR>("fail to copy protocol
buffer object. file={}",
file_path);
}
-
- init_from_pb(tablet_meta_pb);
return Status::OK();
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index cc7daf8a67a..dc4be4f5ef0 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -129,6 +129,7 @@ public:
// Function create_from_file is used to be compatible with previous
tablet_meta.
// Previous tablet_meta is a physical file in tablet dir, which is not
stored in rocksdb.
Status create_from_file(const std::string& file_path);
+ static Status load_from_file(const std::string& file_path, TabletMetaPB*
tablet_meta_pb);
Status save(const std::string& file_path);
Status save_as_json(const string& file_path);
static Status save(const std::string& file_path, const TabletMetaPB&
tablet_meta_pb);
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index 9007a4cc2b6..fd204af7c8d 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -20,6 +20,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <fmt/format.h>
+#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
@@ -60,6 +61,108 @@
namespace doris {
+struct LocalFileStat {
+ uint64_t size;
+ std::string md5;
+};
+
+struct RemoteFileStat {
+ std::string url;
+ std::string md5;
+ uint64_t size;
+};
+
+class SnapshotHttpDownloader {
+public:
+ SnapshotHttpDownloader(const TRemoteTabletSnapshot& remote_tablet_snapshot,
+ TabletSharedPtr tablet, SnapshotLoader&
snapshot_loader)
+ : _tablet(std::move(tablet)),
+ _snapshot_loader(snapshot_loader),
+ _local_tablet_id(remote_tablet_snapshot.local_tablet_id),
+ _remote_tablet_id(remote_tablet_snapshot.remote_tablet_id),
+ _local_path(remote_tablet_snapshot.local_snapshot_path),
+ _remote_path(remote_tablet_snapshot.remote_snapshot_path),
+ _remote_be_addr(remote_tablet_snapshot.remote_be_addr) {
+ auto& token = remote_tablet_snapshot.remote_token;
+ auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+ // HEAD
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+ _base_url =
fmt::format("http://{}:{}/api/_tablet/_download?token={}&channel=ingest_binlog",
+ remote_be_addr.hostname, remote_be_addr.port,
token);
+ }
+ ~SnapshotHttpDownloader() = default;
+ SnapshotHttpDownloader(const SnapshotHttpDownloader&) = delete;
+ SnapshotHttpDownloader& operator=(const SnapshotHttpDownloader&) = delete;
+
+ void set_report_progress_callback(std::function<Status()> report_progress)
{
+ _report_progress_callback = std::move(report_progress);
+ }
+
+ Status download();
+
+private:
+ constexpr static int kDownloadFileMaxRetry = 3;
+
+ // Load existing files from local snapshot path, compute the md5sum of the
files
+ // if enable_download_md5sum_check is true
+ Status _load_existing_files();
+
+ // List remote files from remote be, and find the hdr file
+ Status _list_remote_files();
+
+ // Download hdr file from remote be to a tmp file
+ Status _download_hdr_file();
+
+ // Link same rowset files by compare local hdr file and remote hdr file
+ // if the local files are copied from the remote rowset, link them as the
+ // remote rowset files, to avoid the duplicated downloading.
+ Status _link_same_rowset_files();
+
+ // Get all remote file stats, excluding the hdr file.
+ Status _get_remote_file_stats();
+
+ // Compute the need download files according to the local files md5sum (if
enable_download_md5sum_check is true)
+ void _get_need_download_files();
+
+ // Download all need download files
+ Status _download_files();
+
+ // Install remote hdr file to local snapshot path from the tmp file
+ Status _install_remote_hdr_file();
+
+ // Delete orphan files, which are not in remote
+ Status _delete_orphan_files();
+
+ // Download a file from remote be to local path with the file stat
+ Status _download_http_file(DataDir* data_dir, const std::string&
remote_file_url,
+ const std::string& local_file_path,
+ const RemoteFileStat& remote_filestat);
+
+ // Get the file stat from remote be
+ Status _get_http_file_stat(const std::string& remote_file_url,
RemoteFileStat* file_stat);
+
+ TabletSharedPtr _tablet;
+ SnapshotLoader& _snapshot_loader;
+ std::function<Status()> _report_progress_callback;
+
+ std::string _base_url;
+ int64_t _local_tablet_id;
+ int64_t _remote_tablet_id;
+ const std::string& _local_path;
+ const std::string& _remote_path;
+ const TNetworkAddress& _remote_be_addr;
+
+ std::string _local_hdr_filename;
+ std::string _remote_hdr_filename;
+ std::vector<std::string> _remote_file_list;
+ std::unordered_map<std::string, LocalFileStat> _local_files;
+ std::unordered_map<std::string, RemoteFileStat> _remote_files;
+
+ std::string _tmp_hdr_file;
+ RemoteFileStat _remote_hdr_filestat;
+ std::vector<std::string> _need_download_files;
+};
+
static std::string get_loaded_tag_path(const std::string& snapshot_path) {
return snapshot_path + "/LOADED";
}
@@ -96,6 +199,473 @@ bool _end_with(std::string_view str, std::string_view
match) {
str.compare(str.size() - match.size(), match.size(), match) == 0;
}
+Status SnapshotHttpDownloader::_get_http_file_stat(const std::string&
remote_file_url,
+ RemoteFileStat* file_stat) {
+ uint64_t file_size = 0;
+ std::string file_md5;
+ auto get_file_stat_cb = [&remote_file_url, &file_size,
&file_md5](HttpClient* client) {
+ int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
+ std::string url = remote_file_url;
+ if (config::enable_download_md5sum_check) {
+ // compute md5sum is time-consuming, so we set a longer timeout
+ timeout_ms = config::download_binlog_meta_timeout_ms * 3;
+ url = fmt::format("{}&acquire_md5=true", remote_file_url);
+ }
+ RETURN_IF_ERROR(client->init(url));
+ client->set_timeout_ms(timeout_ms);
+ RETURN_IF_ERROR(client->head());
+ RETURN_IF_ERROR(client->get_content_length(&file_size));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+ }
+ return Status::OK();
+ };
+ RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
get_file_stat_cb));
+ file_stat->url = remote_file_url;
+ file_stat->size = file_size;
+ file_stat->md5 = std::move(file_md5);
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_download_http_file(DataDir* data_dir,
+ const std::string&
remote_file_url,
+ const std::string&
local_file_path,
+ const RemoteFileStat&
remote_filestat) {
+ auto file_size = remote_filestat.size;
+ const auto& remote_file_md5 = remote_filestat.md5;
+
+ // check disk capacity
+ if (data_dir->reach_capacity_limit(file_size)) {
+ return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
+ "reach the capacity limit of path {}, file_size={}",
data_dir->path(), file_size);
+ }
+
+ uint64_t estimate_timeout = file_size /
config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ LOG(INFO) << "clone begin to download file from: " << remote_file_url
+ << " to: " << local_file_path << ". size(B): " << file_size
+ << ", timeout(s): " << estimate_timeout;
+
+ auto download_cb = [&remote_file_url, &remote_file_md5, estimate_timeout,
&local_file_path,
+ file_size](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(remote_file_url));
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download(local_file_path));
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_file_size = std::filesystem::file_size(local_file_path,
ec);
+ if (ec) {
+ LOG(WARNING) << "download file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to {}",
local_file_path,
+ ec.message());
+ }
+ if (local_file_size != file_size) {
+ LOG(WARNING) << "download file length error"
+ << ", remote_path=" << remote_file_url << ",
file_size=" << file_size
+ << ", local_file_size=" << local_file_size;
+ return Status::InternalError("downloaded file size is not equal");
+ }
+
+ if (!remote_file_md5.empty()) { // keep compatibility
+ std::string local_file_md5;
+ RETURN_IF_ERROR(
+ io::global_local_filesystem()->md5sum(local_file_path,
&local_file_md5));
+ if (local_file_md5 != remote_file_md5) {
+ LOG(WARNING) << "download file md5 error"
+ << ", remote_file_url=" << remote_file_url
+ << ", local_file_path=" << local_file_path
+ << ", remote_file_md5=" << remote_file_md5
+ << ", local_file_md5=" << local_file_md5;
+ return Status::RuntimeError(
+ "download file {} md5 is not equal, local={},
remote={}", remote_file_url,
+ local_file_md5, remote_file_md5);
+ }
+ }
+
+ return io::global_local_filesystem()->permission(local_file_path,
+
io::LocalFileSystem::PERMS_OWNER_RW);
+ };
+ auto status = HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
download_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to download file from " << remote_file_url
+ << ", status: " << status.to_string();
+ return status;
+ }
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_load_existing_files() {
+ std::vector<std::string> existing_files;
+
RETURN_IF_ERROR(_snapshot_loader._get_existing_files_from_local(_local_path,
&existing_files));
+ for (auto& local_file : existing_files) {
+ // add file size
+ std::string local_file_path = _local_path + "/" + local_file;
+ std::error_code ec;
+ uint64_t local_file_size = std::filesystem::file_size(local_file_path,
ec);
+ if (ec) {
+ LOG(WARNING) << "download file error, can't retrive file_size of "
<< local_file_path
+ << ", due to " << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to {}",
local_file_path,
+ ec.message());
+ }
+
+ // get md5sum
+ std::string md5;
+ if (config::enable_download_md5sum_check) {
+ auto status =
io::global_local_filesystem()->md5sum(local_file_path, &md5);
+ if (!status.ok()) {
+ LOG(WARNING) << "download file error, local file " <<
local_file_path
+ << " md5sum: " << status.to_string();
+ return status;
+ }
+ }
+ _local_files[local_file] = {local_file_size, md5};
+
+ // get hdr file
+ if (local_file.ends_with(".hdr")) {
+ _local_hdr_filename = local_file;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_list_remote_files() {
+ // get all these use http download action
+ //
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+ std::string remote_url_prefix = fmt::format("{}&file={}", _base_url,
_remote_path);
+
+ LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: " <<
_snapshot_loader._job_id
+ << ", task id: " << _snapshot_loader._task_id << ", remote be: "
<< _remote_be_addr;
+
+ std::string remote_file_list_str;
+ auto list_files_cb = [&remote_url_prefix,
&remote_file_list_str](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(remote_url_prefix));
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
+ return client->execute(&remote_file_list_str);
+ };
+ RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
list_files_cb));
+
+ _remote_file_list = strings::Split(remote_file_list_str, "\n",
strings::SkipWhitespace());
+
+ // find hdr file
+ auto hdr_file =
+ std::find_if(_remote_file_list.begin(), _remote_file_list.end(),
+ [](const std::string& filename) { return
_end_with(filename, ".hdr"); });
+ if (hdr_file == _remote_file_list.end()) {
+ std::string msg =
+ fmt::format("can't find hdr file in remote snapshot path: {}",
_remote_path);
+ LOG(WARNING) << msg;
+ return Status::RuntimeError(std::move(msg));
+ }
+ _remote_hdr_filename = *hdr_file;
+ _remote_file_list.erase(hdr_file);
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_download_hdr_file() {
+ RemoteFileStat remote_hdr_stat;
+ std::string remote_hdr_file_url =
+ fmt::format("{}&file={}/{}", _base_url, _remote_path,
_remote_hdr_filename);
+ auto status = _get_http_file_stat(remote_hdr_file_url, &remote_hdr_stat);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get remote hdr file stat: " <<
remote_hdr_file_url
+ << ", error: " << status.to_string();
+ return status;
+ }
+
+ std::string hdr_filename = _remote_hdr_filename + ".tmp";
+ std::string hdr_file = _local_path + "/" + hdr_filename;
+ status = _download_http_file(_tablet->data_dir(), remote_hdr_file_url,
hdr_file,
+ remote_hdr_stat);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to download remote hdr file: " <<
remote_hdr_file_url
+ << ", error: " << status.to_string();
+ return status;
+ }
+ _tmp_hdr_file = hdr_file;
+ _remote_hdr_filestat = remote_hdr_stat;
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_link_same_rowset_files() {
+ std::string local_hdr_file_path = _local_path + "/" + _local_hdr_filename;
+
+ // load local tablet meta
+ TabletMetaPB local_tablet_meta;
+ auto status = TabletMeta::load_from_file(local_hdr_file_path,
&local_tablet_meta);
+ if (!status.ok()) {
+ // This file might broken because of the partial downloading.
+ LOG(WARNING) << "failed to load local tablet meta: " <<
local_hdr_file_path
+ << ", skip link same rowset files, error: " <<
status.to_string();
+ return Status::OK();
+ }
+
+ // load remote tablet meta
+ TabletMetaPB remote_tablet_meta;
+ status = TabletMeta::load_from_file(_tmp_hdr_file, &remote_tablet_meta);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to load remote tablet meta: " << _tmp_hdr_file
+ << ", error: " << status.to_string();
+ return status;
+ }
+
+ LOG(INFO) << "link rowset files by compare " << _local_hdr_filename << "
and "
+ << _remote_hdr_filename;
+
+ std::unordered_map<std::string, const RowsetMetaPB&> remote_rowset_metas;
+ for (const auto& rowset_meta : remote_tablet_meta.rs_metas()) {
+ if (rowset_meta.has_resource_id()) { // skip remote rowset
+ continue;
+ }
+ remote_rowset_metas.insert({rowset_meta.rowset_id_v2(), rowset_meta});
+ }
+
+ for (const auto& local_rowset_meta : local_tablet_meta.rs_metas()) {
+ if (local_rowset_meta.has_resource_id() ||
!local_rowset_meta.has_source_rowset_id()) {
+ continue;
+ }
+
+ auto remote_rowset_meta =
remote_rowset_metas.find(local_rowset_meta.source_rowset_id());
+ if (remote_rowset_meta == remote_rowset_metas.end()) {
+ continue;
+ }
+
+ const auto& remote_rowset_id = remote_rowset_meta->first;
+ const auto& remote_rowset_meta_pb = remote_rowset_meta->second;
+ const auto& local_rowset_id = local_rowset_meta.rowset_id_v2();
+ auto remote_tablet_id = remote_rowset_meta_pb.tablet_id();
+ if (local_rowset_meta.start_version() !=
remote_rowset_meta_pb.start_version() ||
+ local_rowset_meta.end_version() !=
remote_rowset_meta_pb.end_version()) {
+ continue;
+ }
+
+ LOG(INFO) << "rowset " << local_rowset_id << " was downloaded from
remote tablet "
+ << remote_tablet_id << " rowset " << remote_rowset_id
+ << ", directly link files instead of downloading";
+
+ // Since the rowset meta are the same, we can link the local rowset
files as
+ // the downloaded remote rowset files.
+ for (const auto& [local_file, local_filestat] : _local_files) {
+ if (!local_file.starts_with(local_rowset_id)) {
+ continue;
+ }
+
+ std::string remote_file = local_file;
+ remote_file.replace(0, local_rowset_id.size(), remote_rowset_id);
+ std::string local_file_path = _local_path + "/" + local_file;
+ std::string remote_file_path = _local_path + "/" + remote_file;
+
+ bool exist = true;
+
RETURN_IF_ERROR(io::global_local_filesystem()->exists(remote_file_path,
&exist));
+ if (exist) {
+ continue;
+ }
+
+ LOG(INFO) << "link file from " << local_file_path << " to " <<
remote_file_path;
+ if (!io::global_local_filesystem()->link_file(local_file_path,
remote_file_path)) {
+ std::string msg = fmt::format("link file failed from {} to {},
err: {}",
+ local_file_path,
remote_file_path, strerror(errno));
+ LOG(WARNING) << msg;
+ return Status::InternalError(std::move(msg));
+ }
+
+ _local_files[remote_file] = local_filestat;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_get_remote_file_stats() {
+ for (const auto& filename : _remote_file_list) {
+ if (_report_progress_callback) {
+ RETURN_IF_ERROR(_report_progress_callback());
+ }
+
+ std::string remote_file_url =
+ fmt::format("{}&file={}/{}", _base_url, _remote_path,
filename);
+
+ RemoteFileStat remote_filestat;
+ RETURN_IF_ERROR(_get_http_file_stat(remote_file_url,
&remote_filestat));
+ _remote_files[filename] = remote_filestat;
+ }
+
+ return Status::OK();
+}
+
+void SnapshotHttpDownloader::_get_need_download_files() {
+ for (const auto& [remote_file, remote_filestat] : _remote_files) {
+ LOG(INFO) << "remote file: " << remote_file << ", size: " <<
remote_filestat.size
+ << ", md5: " << remote_filestat.md5;
+ auto it = _local_files.find(remote_file);
+ if (it == _local_files.end()) {
+ _need_download_files.emplace_back(remote_file);
+ continue;
+ }
+
+ if (auto& local_filestat = it->second; local_filestat.size !=
remote_filestat.size) {
+ _need_download_files.emplace_back(remote_file);
+ continue;
+ }
+
+ if (auto& local_filestat = it->second; local_filestat.md5 !=
remote_filestat.md5) {
+ _need_download_files.emplace_back(remote_file);
+ continue;
+ }
+
+ LOG(INFO) << fmt::format("file {} already exists, skip download url
{}", remote_file,
+ remote_filestat.url);
+ }
+}
+
+Status SnapshotHttpDownloader::_download_files() {
+ DataDir* data_dir = _tablet->data_dir();
+
+ uint64_t total_file_size = 0;
+ MonotonicStopWatch watch(true);
+ for (auto& filename : _need_download_files) {
+ if (_report_progress_callback) {
+ RETURN_IF_ERROR(_report_progress_callback());
+ }
+
+ auto& remote_filestat = _remote_files[filename];
+ auto file_size = remote_filestat.size;
+ auto& remote_file_url = remote_filestat.url;
+ auto& remote_file_md5 = remote_filestat.md5;
+
+ std::string local_filename;
+ RETURN_IF_ERROR(
+ _snapshot_loader._replace_tablet_id(filename,
_local_tablet_id, &local_filename));
+ std::string local_file_path = _local_path + "/" + local_filename;
+
+ RETURN_IF_ERROR(
+ _download_http_file(data_dir, remote_file_url,
local_file_path, remote_filestat));
+ total_file_size += file_size;
+
+ // local_files always keep the updated local files
+ _local_files[filename] = LocalFileStat {file_size, remote_file_md5};
+ }
+
+ uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+ total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+ double copy_rate = 0.0;
+ if (total_time_ms > 0) {
+ copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+ }
+ LOG(INFO) << fmt::format(
+ "succeed to copy remote tablet {} to local tablet {}, total
downloading {} files, "
+ "total file size: {} B, cost: {} ms, rate: {} MB/s",
+ _remote_tablet_id, _local_tablet_id, _need_download_files.size(),
total_file_size,
+ total_time_ms, copy_rate);
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_install_remote_hdr_file() {
+ std::string local_hdr_filename;
+ RETURN_IF_ERROR(_snapshot_loader._replace_tablet_id(_remote_hdr_filename,
_local_tablet_id,
+ &local_hdr_filename));
+ std::string local_hdr_file_path = _local_path + "/" + local_hdr_filename;
+
+ auto status = io::global_local_filesystem()->rename(_tmp_hdr_file,
local_hdr_file_path);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to install remote hdr file from: " <<
_tmp_hdr_file << " to"
+ << local_hdr_file_path << ", error: " <<
status.to_string();
+ return Status::RuntimeError("failed install remote hdr file {} from
tmp {}, error: {}",
+ local_hdr_file_path, _tmp_hdr_file,
status.to_string());
+ }
+
+ // also save the hdr file into remote files.
+ _remote_files[_remote_hdr_filename] = _remote_hdr_filestat;
+
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::_delete_orphan_files() {
+ // local_files: contain all remote files and local files
+ // finally, delete local files which are not in remote
+ for (const auto& [local_file, local_filestat] : _local_files) {
+ // replace the tablet id in local file name with the remote tablet id,
+ // in order to compare the file name.
+ std::string new_name;
+ Status st = _snapshot_loader._replace_tablet_id(local_file,
_remote_tablet_id, &new_name);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to replace tablet id. unknown local file:
" << st
+ << ". ignore it";
+ continue;
+ }
+ VLOG_CRITICAL << "new file name after replace tablet id: " << new_name;
+ const auto& find = _remote_files.find(new_name);
+ if (find != _remote_files.end()) {
+ continue;
+ }
+
+ // delete
+ std::string full_local_file = _local_path + "/" + local_file;
+ LOG(INFO) << "begin to delete local snapshot file: " << full_local_file
+ << ", it does not exist in remote";
+ if (remove(full_local_file.c_str()) != 0) {
+ LOG(WARNING) << "failed to delete unknown local file: " <<
full_local_file
+ << ", error: " << strerror(errno) << ", file size: "
<< local_filestat.size
+ << ", ignore it";
+ }
+ }
+ return Status::OK();
+}
+
+Status SnapshotHttpDownloader::download() {
+ // Take a lock to protect the local snapshot path.
+ auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(_local_path);
+
+ // Step 1: Validate local tablet snapshot paths
+ bool res = true;
+ RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(_local_path,
&res));
+ if (!res) {
+ std::string msg =
+ fmt::format("snapshot path is not directory or does not exist:
{}", _local_path);
+ LOG(WARNING) << msg;
+ return Status::RuntimeError(std::move(msg));
+ }
+
+ // Step 2: get all local files
+ RETURN_IF_ERROR(_load_existing_files());
+
+ // Step 3: Validate remote tablet snapshot paths && remote files map
+ RETURN_IF_ERROR(_list_remote_files());
+
+ // Step 4: download hdr file to a tmp file
+ RETURN_IF_ERROR(_download_hdr_file());
+
+ // Step 5: link same rowset files, if local tablet meta file exists
+ if (!_local_hdr_filename.empty()) {
+ RETURN_IF_ERROR(_link_same_rowset_files());
+ }
+
+ // Step 6: get all remote file stats
+ RETURN_IF_ERROR(_get_remote_file_stats());
+
+ // Step 7: get all need download files & download them
+ _get_need_download_files();
+ if (!_need_download_files.empty()) {
+ RETURN_IF_ERROR(_download_files());
+ }
+
+ // Step 8: install remote hdr file from tmp file
+ RETURN_IF_ERROR(_install_remote_hdr_file());
+
+ // Step 9: delete orphan files
+ RETURN_IF_ERROR(_delete_orphan_files());
+
+ return Status::OK();
+}
+
SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t
job_id, int64_t task_id,
const TNetworkAddress& broker_addr,
const std::map<std::string, std::string>& prop)
@@ -413,11 +983,6 @@ Status SnapshotLoader::download(const
std::map<std::string, std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
- LOG(INFO) << fmt::format("begin to download snapshots via http. job: {},
task id: {}", _job_id,
- _task_id);
-
- constexpr uint32_t kDownloadFileMaxRetry = 3;
-
// check if job has already been cancelled
int tmp_counter = 1;
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
@@ -427,281 +992,28 @@ Status SnapshotLoader::remote_http_download(
int finished_num = 0;
int total_num = remote_tablet_snapshots.size();
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+ auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+
LOG(INFO) << fmt::format(
"download snapshots via http. job: {}, task id: {}, local dir:
{}, remote dir: {}",
_job_id, _task_id, local_path, remote_path);
- // Take a lock to protect the local snapshot path.
- auto local_snapshot_guard =
LocalSnapshotLock::instance().acquire(local_path);
-
- // Step 1: Validate local tablet snapshot paths
- bool res = true;
-
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
- if (!res) {
- std::stringstream ss;
- auto err_msg =
- fmt::format("snapshot path is not directory or does not
exist: {}", local_path);
- LOG(WARNING) << err_msg;
- return Status::RuntimeError(err_msg);
- }
-
- // Step 2: get all local files
- struct LocalFileStat {
- uint64_t size;
- std::string md5;
- };
- std::unordered_map<std::string, LocalFileStat> local_files;
- std::vector<std::string> existing_files;
- RETURN_IF_ERROR(_get_existing_files_from_local(local_path,
&existing_files));
- for (auto& local_file : existing_files) {
- // add file size
- std::string local_file_path = local_path + "/" + local_file;
- std::error_code ec;
- uint64_t local_file_size =
std::filesystem::file_size(local_file_path, ec);
- if (ec) {
- LOG(WARNING) << "download file error" << ec.message();
- return Status::IOError("can't retrive file_size of {}, due to
{}", local_file_path,
- ec.message());
- }
- std::string md5;
- auto status =
io::global_local_filesystem()->md5sum(local_file_path, &md5);
- if (!status.ok()) {
- LOG(WARNING) << "download file error, local file " <<
local_file_path
- << " md5sum: " << status.to_string();
- return status;
- }
- local_files[local_file] = {local_file_size, md5};
- }
- existing_files.clear();
-
- // Step 3: Validate remote tablet snapshot paths && remote files map
- // key is remote snapshot paths, value is filelist
- // get all these use http download action
- //
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
- struct RemoteFileStat {
- std::string url;
- std::string md5;
- uint64_t size;
- };
- std::unordered_map<std::string, RemoteFileStat> remote_files;
- const auto& token = remote_tablet_snapshot.remote_token;
- const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
-
- // HEAD
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
- std::string base_url =
fmt::format("http://{}:{}/api/_tablet/_download?token={}",
- remote_be_addr.hostname,
remote_be_addr.port, token);
- std::string remote_url_prefix = fmt::format("{}&file={}", base_url,
remote_path);
-
- LOG(INFO) << "list remote files: " << remote_url_prefix << ", job: "
<< _job_id
- << ", task id: " << _task_id << ", remote be: " <<
remote_be_addr;
- string file_list_str;
- auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient*
client) {
- RETURN_IF_ERROR(client->init(remote_url_prefix));
- client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
- return client->execute(&file_list_str);
- };
- RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry,
1, list_files_cb));
- std::vector<string> filename_list =
- strings::Split(file_list_str, "\n", strings::SkipWhitespace());
-
- for (const auto& filename : filename_list) {
- std::string remote_file_url =
- fmt::format("{}&file={}/{}&channel=ingest_binlog",
base_url,
- remote_tablet_snapshot.remote_snapshot_path,
filename);
-
- // get file length
- uint64_t file_size = 0;
- std::string file_md5;
- auto get_file_stat_cb = [&remote_file_url, &file_size,
&file_md5](HttpClient* client) {
- int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
- std::string url = remote_file_url;
- if (config::enable_download_md5sum_check) {
- // compute md5sum is time-consuming, so we set a longer
timeout
- timeout_ms = config::download_binlog_meta_timeout_ms * 3;
- url = fmt::format("{}&acquire_md5=true", remote_file_url);
- }
- RETURN_IF_ERROR(client->init(url));
- client->set_timeout_ms(timeout_ms);
- RETURN_IF_ERROR(client->head());
- RETURN_IF_ERROR(client->get_content_length(&file_size));
- if (config::enable_download_md5sum_check) {
- RETURN_IF_ERROR(client->get_content_md5(&file_md5));
- }
- return Status::OK();
- };
- RETURN_IF_ERROR(
- HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1,
get_file_stat_cb));
-
- remote_files[filename] = RemoteFileStat {remote_file_url,
file_md5, file_size};
- }
-
- // Step 4: Compare local and remote files && get all need download
files
- RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
- TTaskType::type::DOWNLOAD));
-
- // get all need download files
- std::vector<std::string> need_download_files;
- for (const auto& [remote_file, remote_filestat] : remote_files) {
- LOG(INFO) << "remote file: " << remote_file << ", size: " <<
remote_filestat.size
- << ", md5: " << remote_filestat.md5;
- auto it = local_files.find(remote_file);
- if (it == local_files.end()) {
- need_download_files.emplace_back(remote_file);
- continue;
- }
- if (_end_with(remote_file, ".hdr")) {
- need_download_files.emplace_back(remote_file);
- continue;
- }
-
- if (auto& local_filestat = it->second; local_filestat.size !=
remote_filestat.size) {
- need_download_files.emplace_back(remote_file);
- continue;
- }
-
- if (auto& local_filestat = it->second; local_filestat.md5 !=
remote_filestat.md5) {
- need_download_files.emplace_back(remote_file);
- continue;
- }
-
- LOG(INFO) << fmt::format("file {} already exists, skip download",
remote_file);
- }
-
- auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
TabletSharedPtr tablet =
_engine.tablet_manager()->get_tablet(local_tablet_id);
if (tablet == nullptr) {
- std::stringstream ss;
- ss << "failed to get local tablet: " << local_tablet_id;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
+ std::string msg = fmt::format("failed to get local tablet: {}",
local_tablet_id);
+ LOG(WARNING) << msg;
+ return Status::RuntimeError(std::move(msg));
}
- DataDir* data_dir = tablet->data_dir();
-
- // download all need download files
- uint64_t total_file_size = 0;
- MonotonicStopWatch watch;
- watch.start();
- for (auto& filename : need_download_files) {
- auto& remote_filestat = remote_files[filename];
- auto file_size = remote_filestat.size;
- auto& remote_file_url = remote_filestat.url;
- auto& remote_file_md5 = remote_filestat.md5;
- // check disk capacity
- if (data_dir->reach_capacity_limit(file_size)) {
- return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
- "reach the capacity limit of path {}, file_size={}",
data_dir->path(),
- file_size);
- }
-
- total_file_size += file_size;
- uint64_t estimate_timeout = file_size /
config::download_low_speed_limit_kbps / 1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
- std::string local_filename;
- RETURN_IF_ERROR(_replace_tablet_id(filename, local_tablet_id,
&local_filename));
- std::string local_file_path = local_path + "/" + local_filename;
-
- LOG(INFO) << "clone begin to download file from: " <<
remote_file_url
- << " to: " << local_file_path << ". size(B): " <<
file_size
- << ", timeout(s): " << estimate_timeout;
-
- auto download_cb = [&remote_file_url, &remote_file_md5,
estimate_timeout,
- &local_file_path, file_size](HttpClient*
client) {
- RETURN_IF_ERROR(client->init(remote_file_url));
- client->set_timeout_ms(estimate_timeout * 1000);
- RETURN_IF_ERROR(client->download(local_file_path));
-
- std::error_code ec;
- // Check file length
- uint64_t local_file_size =
std::filesystem::file_size(local_file_path, ec);
- if (ec) {
- LOG(WARNING) << "download file error" << ec.message();
- return Status::IOError("can't retrive file_size of {}, due
to {}",
- local_file_path, ec.message());
- }
- if (local_file_size != file_size) {
- LOG(WARNING) << "download file length error"
- << ", remote_path=" << remote_file_url
- << ", file_size=" << file_size
- << ", local_file_size=" << local_file_size;
- return Status::InternalError("downloaded file size is not
equal");
- }
-
- if (!remote_file_md5.empty()) { // keep compatibility
- std::string local_file_md5;
-
RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(local_file_path,
-
&local_file_md5));
- if (local_file_md5 != remote_file_md5) {
- LOG(WARNING) << "download file md5 error"
- << ", remote_file_url=" << remote_file_url
- << ", local_file_path=" << local_file_path
- << ", remote_file_md5=" << remote_file_md5
- << ", local_file_md5=" << local_file_md5;
- return Status::RuntimeError(
- "download file {} md5 is not equal, local={},
remote={}",
- remote_file_url, local_file_md5,
remote_file_md5);
- }
- }
-
- return io::global_local_filesystem()->permission(
- local_file_path, io::LocalFileSystem::PERMS_OWNER_RW);
- };
- auto status =
HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, download_cb);
- if (!status.ok()) {
- LOG(WARNING) << "failed to download file from " <<
remote_file_url
- << ", status: " << status.to_string();
- return status;
- }
-
- // local_files always keep the updated local files
- local_files[filename] = LocalFileStat {file_size, remote_file_md5};
- }
-
- uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
- total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
- double copy_rate = 0.0;
- if (total_time_ms > 0) {
- copy_rate = total_file_size / ((double)total_time_ms) / 1000;
- }
- auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
- LOG(INFO) << fmt::format(
- "succeed to copy remote tablet {} to local tablet {}, total
file size: {} B, cost: "
- "{} ms, rate: {} MB/s",
- remote_tablet_id, local_tablet_id, total_file_size,
total_time_ms, copy_rate);
-
- // local_files: contain all remote files and local files
- // finally, delete local files which are not in remote
- for (const auto& [local_file, local_filestat] : local_files) {
- // replace the tablet id in local file name with the remote tablet
id,
- // in order to compare the file name.
- std::string new_name;
- Status st = _replace_tablet_id(local_file, remote_tablet_id,
&new_name);
- if (!st.ok()) {
- LOG(WARNING) << "failed to replace tablet id. unknown local
file: " << st
- << ". ignore it";
- continue;
- }
- VLOG_CRITICAL << "new file name after replace tablet id: " <<
new_name;
- const auto& find = remote_files.find(new_name);
- if (find != remote_files.end()) {
- continue;
- }
-
- // delete
- std::string full_local_file = local_path + "/" + local_file;
- LOG(INFO) << "begin to delete local snapshot file: " <<
full_local_file
- << ", it does not exist in remote";
- if (remove(full_local_file.c_str()) != 0) {
- LOG(WARNING) << "failed to delete unknown local file: " <<
full_local_file
- << ", error: " << strerror(errno)
- << ", file size: " << local_filestat.size << ",
ignore it";
- }
- }
+ SnapshotHttpDownloader downloader(remote_tablet_snapshot,
std::move(tablet), *this);
+ downloader.set_report_progress_callback(
+ [this, &report_counter, &finished_num, &total_num]() {
+ return _report_every(10, &report_counter, finished_num,
total_num,
+ TTaskType::type::DOWNLOAD);
+ });
+ RETURN_IF_ERROR(downloader.download());
++finished_num;
}
@@ -874,6 +1186,25 @@ Status SnapshotLoader::move(const std::string&
snapshot_path, TabletSharedPtr ta
return status;
}
+Status SnapshotLoader::_replace_tablet_id(const std::string& file_name,
int64_t tablet_id,
+ std::string* new_file_name) {
+ // eg:
+ // 10007.hdr
+ // 10007_2_2_0_0.idx
+ // 10007_2_2_0_0.dat
+ if (_end_with(file_name, ".hdr")) {
+ std::stringstream ss;
+ ss << tablet_id << ".hdr";
+ *new_file_name = ss.str();
+ return Status::OK();
+ } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
+ *new_file_name = file_name;
+ return Status::OK();
+ } else {
+ return Status::InternalError("invalid tablet file name: {}",
file_name);
+ }
+}
+
Status SnapshotLoader::_get_tablet_id_and_schema_hash_from_file_path(const
std::string& src_path,
int64_t*
tablet_id,
int32_t*
schema_hash) {
@@ -941,25 +1272,6 @@ Status
SnapshotLoader::_get_existing_files_from_local(const std::string& local_p
return Status::OK();
}
-Status SnapshotLoader::_replace_tablet_id(const std::string& file_name,
int64_t tablet_id,
- std::string* new_file_name) {
- // eg:
- // 10007.hdr
- // 10007_2_2_0_0.idx
- // 10007_2_2_0_0.dat
- if (_end_with(file_name, ".hdr")) {
- std::stringstream ss;
- ss << tablet_id << ".hdr";
- *new_file_name = ss.str();
- return Status::OK();
- } else if (_end_with(file_name, ".idx") || _end_with(file_name, ".dat")) {
- *new_file_name = file_name;
- return Status::OK();
- } else {
- return Status::InternalError("invalid tablet file name: {}",
file_name);
- }
-}
-
Status SnapshotLoader::_get_tablet_id_from_remote_path(const std::string&
remote_path,
int64_t* tablet_id) {
// eg:
diff --git a/be/src/runtime/snapshot_loader.h b/be/src/runtime/snapshot_loader.h
index 7b1d5a0d942..4913af6dd86 100644
--- a/be/src/runtime/snapshot_loader.h
+++ b/be/src/runtime/snapshot_loader.h
@@ -32,6 +32,7 @@ namespace io {
class RemoteFileSystem;
} // namespace io
+class DataDir;
class TRemoteTabletSnapshot;
class StorageEngine;
@@ -63,6 +64,8 @@ class ExecEnv;
*
*/
class SnapshotLoader {
+ friend class SnapshotHttpDownloader;
+
public:
SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id,
int64_t task_id,
const TNetworkAddress& broker_addr = {},
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 3513e35b541..a2443c3cc4d 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -236,6 +236,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
status.to_thrift(&tstatus);
return;
}
+ // save source rowset id and tablet id
+ rowset_meta_pb.set_source_rowset_id(remote_rowset_id);
+ rowset_meta_pb.set_source_tablet_id(request.remote_tablet_id);
// rewrite rowset meta
rowset_meta_pb.set_tablet_id(local_tablet_id);
rowset_meta_pb.set_partition_id(partition_id);
@@ -494,6 +497,10 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
std::string remote_file_md5;
RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ LOG(INFO) << "download segment index file to " <<
local_segment_index_path
+ << ", remote md5: " << remote_file_md5
+ << ", remote size: " << segment_index_file_size;
+
std::error_code ec;
// Check file length
uint64_t local_index_file_size =
@@ -546,7 +553,6 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
local_tablet->tablet_path(),
rowset_meta, &rowset);
-
if (!status) {
LOG(WARNING) << "failed to create rowset from rowset meta for remote
tablet"
<< ". rowset_id: " << rowset_meta_pb.rowset_id()
diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp
index 9dc3ee74152..f0f9442bcf5 100644
--- a/be/src/util/stopwatch.hpp
+++ b/be/src/util/stopwatch.hpp
@@ -33,9 +33,12 @@ namespace doris {
template <clockid_t Clock>
class CustomStopWatch {
public:
- CustomStopWatch() {
+ CustomStopWatch(bool auto_start = false) {
_total_time = 0;
_running = false;
+ if (auto_start) {
+ start();
+ }
}
timespec start_time() const { return _start; }
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 1b1afbb9d2b..8904ffc74e4 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -118,6 +118,9 @@ message RowsetMetaPB {
// to indicate whether the data between the segments overlap
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
optional int64 compaction_level = 52 [default = 0];
+ // For backup/restore, record the tablet id and rowset id of the source
cluster.
+ optional int64 source_tablet_id = 53;
+ optional string source_rowset_id = 54;
// For cloud
// for data recycling
@@ -203,6 +206,10 @@ message RowsetMetaCloudPB {
// to indicate whether the data between the segments overlap
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
+ // For backup/restore, record the tablet id and rowset id of the source
cluster.
+ optional int64 source_tablet_id = 53;
+ optional string source_rowset_id = 54;
+
// cloud
// the field is a vector, rename it
repeated int64 segments_file_size = 100;
diff --git a/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
b/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
index 18c33cc72d0..c65b0aea272 100644
--- a/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
+++ b/regression-test/suites/ccr_syncer_p1/test_backup_restore.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_backup_restore") {
+suite("test_backup_restore_ccr") {
def syncer = getSyncer()
if (!syncer.checkEnableFeatureBinlog()) {
@@ -66,4 +66,11 @@ suite("test_backup_restore") {
target_sql " sync "
res = target_sql "SELECT * FROM ${tableName}"
assertEquals(res.size(), insert_num)
+
+ logger.info("=== Test 2: restore again ===")
+ assertTrue(syncer.restoreSnapshot(true))
+ syncer.waitTargetRestoreFinish()
+ target_sql " sync "
+ res = target_sql "SELECT * FROM ${tableName}"
+ assertEquals(res.size(), insert_num)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]