This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 09b56d5d4d6 [improvement](binlog)Support inverted index in CCR
(#31743) (#36203)
09b56d5d4d6 is described below
commit 09b56d5d4d6c46725b2beffe39d0764ff1f0cda4
Author: qiye <[email protected]>
AuthorDate: Sun Jun 16 09:36:46 2024 +0800
[improvement](binlog)Support inverted index in CCR (#31743) (#36203)
---
be/src/http/action/download_binlog_action.cpp | 39 ++++++++
be/src/olap/rowset/beta_rowset.cpp | 43 ++++++++-
be/src/olap/snapshot_manager.cpp | 53 ++++++++++-
be/src/olap/tablet.cpp | 26 ++++-
be/src/olap/tablet.h | 5 +
be/src/olap/tablet_manager.cpp | 17 +++-
be/src/olap/task/engine_clone_task.cpp | 14 ++-
be/src/service/backend_service.cpp | 132 ++++++++++++++++++++++++--
8 files changed, 307 insertions(+), 22 deletions(-)
diff --git a/be/src/http/action/download_binlog_action.cpp
b/be/src/http/action/download_binlog_action.cpp
index 697512b2a30..dbe2880d3b4 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -47,6 +47,7 @@ const std::string kTabletIdParameter = "tablet_id";
const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
+const std::string kSegmentIndexIdParameter = "segment_index_id";
// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
@@ -130,6 +131,42 @@ void handle_get_segment_file(HttpRequest* req,
bufferevent_rate_limit_group* rat
do_file_response(segment_file_path, req, rate_limit_group);
}
+/// handle get segment index file, need tablet_id, rowset_id, segment_index &&
segment_index_id
+void handle_get_segment_index_file(HttpRequest* req,
+ bufferevent_rate_limit_group*
rate_limit_group) {
+ // Step 1: get download file path
+ std::string segment_index_file_path;
+ try {
+ const auto& tablet_id = get_http_param(req, kTabletIdParameter);
+ auto tablet = get_tablet(tablet_id);
+ const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
+ const auto& segment_index = get_http_param(req,
kSegmentIndexParameter);
+ const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
+ segment_index_file_path =
+ tablet->get_segment_index_filepath(rowset_id, segment_index,
segment_index_id);
+ } catch (const std::exception& e) {
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
e.what());
+ LOG(WARNING) << "get download file path failed, error: " << e.what();
+ return;
+ }
+
+ // Step 2: handle download
+ // check file exists
+ bool exists = false;
+ Status status =
io::global_local_filesystem()->exists(segment_index_file_path, &exists);
+ if (!status.ok()) {
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
status.to_string());
+ LOG(WARNING) << "check file exists failed, error: " <<
status.to_string();
+ return;
+ }
+ if (!exists) {
+ HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
+ LOG(WARNING) << "file not exist, file path: " <<
segment_index_file_path;
+ return;
+ }
+ do_file_response(segment_index_file_path, req, rate_limit_group);
+}
+
void handle_get_rowset_meta(HttpRequest* req) {
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
@@ -183,6 +220,8 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
handle_get_binlog_info(req);
} else if (method == "get_segment_file") {
handle_get_segment_file(req, _rate_limit_group.get());
+ } else if (method == "get_segment_index_file") {
+ handle_get_segment_index_file(req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(req);
} else {
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 10d7f326a58..d07b0b2254c 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -454,7 +454,7 @@ Status BetaRowset::add_to_binlog() {
if (fs->type() != io::FileSystemType::LOCAL) {
return Status::InternalError("should be local file system");
}
- io::LocalFileSystem* local_fs =
static_cast<io::LocalFileSystem*>(fs.get());
+ auto* local_fs = static_cast<io::LocalFileSystem*>(fs.get());
// all segments are in the same directory, so cache binlog_dir without
multi times check
std::string binlog_dir;
@@ -462,6 +462,22 @@ Status BetaRowset::add_to_binlog() {
auto segments_num = num_segments();
VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={},
segments_num={}",
rowset_id().to_string(), segments_num);
+
+ Status status;
+ std::vector<string> linked_success_files;
+ Defer remove_linked_files {[&]() { // clear linked files if errors happen
+ if (!status.ok()) {
+ LOG(WARNING) << "will delete linked success files due to error "
<< status;
+ std::vector<io::Path> paths;
+ for (auto& file : linked_success_files) {
+ paths.emplace_back(file);
+ LOG(WARNING) << "will delete linked success file " << file <<
" due to error";
+ }
+ static_cast<void>(local_fs->batch_delete(paths));
+ LOG(WARNING) << "done delete linked success files due to error "
<< status;
+ }
+ }};
+
for (int i = 0; i < segments_num; ++i) {
auto seg_file = segment_file_path(i);
@@ -480,8 +496,29 @@ Status BetaRowset::add_to_binlog() {
.string();
VLOG_DEBUG << "link " << seg_file << " to " << binlog_file;
if (!local_fs->link_file(seg_file, binlog_file).ok()) {
- return Status::Error<OS_ERROR>("fail to create hard link. from={},
to={}, errno={}",
- seg_file, binlog_file, Errno::no());
+ status = Status::Error<OS_ERROR>("fail to create hard link.
from={}, to={}, errno={}",
+ seg_file, binlog_file,
Errno::no());
+ return status;
+ }
+ linked_success_files.push_back(binlog_file);
+
+ for (const auto& index : _schema->indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file =
InvertedIndexDescriptor::get_index_file_name(seg_file, index_id);
+ auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+
std::filesystem::path(index_file).filename())
+ .string();
+ VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
+ if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+ status = Status::Error<OS_ERROR>(
+ "fail to create hard link. from={}, to={}, errno={}",
index_file,
+ binlog_index_file, Errno::no());
+ return status;
+ }
+ linked_success_files.push_back(binlog_index_file);
}
}
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 05e2c771aac..a43a1d187c0 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -647,11 +647,39 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
break;
}
- for (auto& rowset_binlog_meta :
rowset_binlog_metas_pb.rowset_binlog_metas()) {
+ for (const auto& rowset_binlog_meta :
rowset_binlog_metas_pb.rowset_binlog_metas()) {
std::string segment_file_path;
auto num_segments = rowset_binlog_meta.num_segments();
std::string_view rowset_id = rowset_binlog_meta.rowset_id();
+ RowsetMetaPB rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
+ auto err_msg = fmt::format("fail to parse binlog meta data
value:{}",
+ rowset_binlog_meta.data());
+ res = Status::InternalError(err_msg);
+ LOG(WARNING) << err_msg;
+ return res;
+ }
+ const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
+ TabletSchema tablet_schema;
+ tablet_schema.init_from_pb(tablet_schema_pb);
+
+ std::vector<string> linked_success_files;
+ Defer remove_linked_files {[&]() { // clear linked files if errors
happen
+ if (!res.ok()) {
+ LOG(WARNING) << "will delete linked success files due to
error " << res;
+ std::vector<io::Path> paths;
+ for (auto& file : linked_success_files) {
+ paths.emplace_back(file);
+ LOG(WARNING)
+ << "will delete linked success file " << file
<< " due to error";
+ }
+
static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
+ LOG(WARNING) << "done delete linked success files due to
error " << res;
+ }
+ }};
+
+ // link segment files and index files
for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
segment_file_path =
ref_tablet->get_segment_filepath(rowset_id, segment_index);
auto snapshot_segment_file_path =
@@ -664,6 +692,29 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
<< ", dest=" << snapshot_segment_file_path <<
"]";
break;
}
+ linked_success_files.push_back(snapshot_segment_file_path);
+
+ for (const auto& index : tablet_schema.indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ auto index_file = ref_tablet->get_segment_index_filepath(
+ rowset_id, segment_index, index_id);
+ auto snapshot_segment_index_file_path =
+ fmt::format("{}/{}_{}_{}.binlog-index",
schema_full_path, rowset_id,
+ segment_index, index_id);
+ VLOG_DEBUG << "link " << index_file << " to "
+ << snapshot_segment_index_file_path;
+ res = io::global_local_filesystem()->link_file(
+ index_file, snapshot_segment_index_file_path);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to link binlog index file.
[src=" << index_file
+ << ", dest=" <<
snapshot_segment_index_file_path << "]";
+ break;
+ }
+
linked_success_files.push_back(snapshot_segment_index_file_path);
+ }
}
if (!res.ok()) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9eb6f218857..97cbfea9554 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3900,6 +3900,19 @@ std::string
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id,
segment_index);
}
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
+ std::string_view segment_index,
+ std::string_view index_id)
const {
+ // TODO(qiye): support inverted index file format v2, when
https://github.com/apache/doris/pull/30145 is merged
+ return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index, index_id);
+}
+
+std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
int64_t segment_index,
+ int64_t index_id) const {
+ // TODO(qiye): support inverted index file format v2, when
https://github.com/apache/doris/pull/30145 is merged
+ return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id,
segment_index, index_id);
+}
+
std::vector<std::string> Tablet::get_binlog_filepath(std::string_view
binlog_version) const {
const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version);
std::vector<std::string> binlog_filepath;
@@ -3929,7 +3942,6 @@ void Tablet::gc_binlogs(int64_t version) {
const auto& tablet_uid = this->tablet_uid();
const auto tablet_id = this->tablet_id();
- const auto& tablet_path = this->tablet_path();
std::string begin_key = make_binlog_meta_key_prefix(tablet_uid);
std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1);
LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{},
end_key:{}", tablet_id,
@@ -3943,10 +3955,16 @@ void Tablet::gc_binlogs(int64_t version) {
wait_for_deleted_binlog_keys.emplace_back(key);
wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));
+ // add binlog segment files and index files
for (int64_t i = 0; i < num_segments; ++i) {
- auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
- wait_for_deleted_binlog_files.emplace_back(
- fmt::format("{}/_binlog/{}", tablet_path, segment_file));
+
wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i));
+ for (const auto& index : this->tablet_schema()->indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ wait_for_deleted_binlog_files.emplace_back(
+ get_segment_index_filepath(rowset_id, i,
index.index_id()));
+ }
}
};
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 73f3e2d140d..fd781722d1e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -549,6 +549,11 @@ public:
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t
segment_index) const;
+ std::string get_segment_index_filepath(std::string_view rowset_id,
+ std::string_view segment_index,
+ std::string_view index_id) const;
+ std::string get_segment_index_filepath(std::string_view rowset_id, int64_t
segment_index,
+ int64_t index_id) const;
bool can_add_binlog(uint64_t total_binlog_size) const;
void gc_binlogs(int64_t version);
Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f31cd422caf..75dc5555e39 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -953,19 +953,28 @@ Status TabletManager::load_tablet_from_dir(DataDir*
store, TTabletId tablet_id,
io::global_local_filesystem()->list(schema_hash_path, true,
&files, &exists));
for (auto& file : files) {
auto& filename = file.file_name;
- if (!filename.ends_with(".binlog")) {
+ std::string new_suffix;
+ std::string old_suffix;
+
+ if (filename.ends_with(".binlog")) {
+ old_suffix = ".binlog";
+ new_suffix = ".dat";
+ } else if (filename.ends_with(".binlog-index")) {
+ old_suffix = ".binlog-index";
+ new_suffix = ".idx";
+ } else {
continue;
}
- // change clone_file suffix .binlog to .dat
std::string new_filename = filename;
- new_filename.replace(filename.size() - 7, 7, ".dat");
+ new_filename.replace(filename.size() - old_suffix.size(),
old_suffix.size(),
+ new_suffix);
auto from = fmt::format("{}/{}", schema_hash_path, filename);
auto to = fmt::format("{}/_binlog/{}", schema_hash_path,
new_filename);
RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to));
}
- auto meta = store->get_meta();
+ auto* meta = store->get_meta();
// if ingest binlog metas error, it will be gc in
gc_unused_binlog_metas
RETURN_IF_ERROR(
RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid,
&rowset_binlog_metas_pb));
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 3cf9ef2c4c8..c71f245f58e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -82,10 +82,16 @@ namespace {
/// return value: if binlog file not exist, then return to binlog file path
Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
const std::string& clone_file,
bool* skip_link_file) {
- // change clone_file suffix .binlog to .dat
+ std::string to;
std::string new_clone_file = clone_file;
- new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
- auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
+ if (clone_file.ends_with(".binlog")) {
+ // change clone_file suffix from .binlog to .dat
+ new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
+ } else if (clone_file.ends_with(".binlog-index")) {
+ // change clone_file suffix from .binlog-index to .idx
+ new_clone_file.replace(clone_file.size() - 13, 13, ".idx");
+ }
+ to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
// check to to file exist
bool exists = true;
@@ -674,7 +680,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const
std::string& clone_d
auto from = fmt::format("{}/{}", clone_dir, clone_file);
std::string to;
- if (clone_file.ends_with(".binlog")) {
+ if (clone_file.ends_with(".binlog") ||
clone_file.ends_with(".binlog-index")) {
if (!contain_binlog) {
LOG(WARNING) << "clone binlog file, but not contain binlog
metas. "
<< "tablet=" << tablet->full_name() << ",
clone_file=" << clone_file;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 339c53cf7dd..745a47d89c0 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -103,6 +103,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
auto& request = arg->request;
TStatus tstatus;
+ std::vector<std::string> download_success_files;
Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
LOG(INFO) << "ingest binlog. result: " <<
apache::thrift::ThriftDebugString(tstatus);
if (tstatus.status_code != TStatusCode::OK) {
@@ -110,6 +111,15 @@ void _ingest_binlog(IngestBinlogArg* arg) {
StorageEngine::instance()->txn_manager()->abort_txn(
partition_id, txn_id, local_tablet_id,
local_tablet->schema_hash(),
local_tablet_uid);
+ // delete all successfully downloaded files
+ LOG(WARNING) << "will delete downloaded success files due to error
" << tstatus;
+ std::vector<io::Path> paths;
+ for (const auto& file : download_success_files) {
+ paths.emplace_back(file);
+ LOG(WARNING) << "will delete downloaded success file " << file
<< " due to error";
+ }
+
static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
+ LOG(WARNING) << "done delete downloaded success files due to error
" << tstatus;
}
if (ingest_binlog_tstatus) {
@@ -224,7 +234,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
}
// Step 5.2: check data capacity
- uint64_t total_size = std::accumulate(segment_file_sizes.begin(),
segment_file_sizes.end(), 0);
+ uint64_t total_size = std::accumulate(segment_file_sizes.begin(),
segment_file_sizes.end(),
+ 0ULL); //
NOLINT(bugprone-fold-init-type)
if (!local_tablet->can_add_binlog(total_size)) {
LOG(WARNING) << "failed to add binlog, no enough space, total_size="
<< total_size
<< ", tablet=" << local_tablet->tablet_id();
@@ -249,10 +260,11 @@ void _ingest_binlog(IngestBinlogArg* arg) {
LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
local_segment_path);
auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
- estimate_timeout](HttpClient* client) {
+ estimate_timeout,
&download_success_files](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
RETURN_IF_ERROR(client->download(local_segment_path));
+ download_success_files.push_back(local_segment_path);
std::error_code ec;
// Check file length
@@ -282,8 +294,116 @@ void _ingest_binlog(IngestBinlogArg* arg) {
}
}
- // Step 6: create rowset && calculate delete bitmap && commit
- // Step 6.1: create rowset
+ // Step 6: get all segment index files
+ // Step 6.1: get all segment index files size
+ std::vector<std::string> segment_index_file_urls;
+ std::vector<uint64_t> segment_index_file_sizes;
+ std::vector<std::string> segment_index_file_names;
+ auto tablet_schema = rowset_meta->tablet_schema();
+ for (const auto& index : tablet_schema->indexes()) {
+ if (index.index_type() != IndexType::INVERTED) {
+ continue;
+ }
+ auto index_id = index.index_id();
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto get_segment_index_file_size_url = fmt::format(
+
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+ "}",
+ binlog_api_url, "get_segment_index_file",
request.remote_tablet_id,
+ remote_rowset_id, segment_index, index_id);
+ uint64_t segment_index_file_size;
+ auto get_segment_index_file_size_cb =
[&get_segment_index_file_size_url,
+
&segment_index_file_size](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ RETURN_IF_ERROR(client->head());
+ return client->get_content_length(&segment_index_file_size);
+ };
+ auto index_file =
InvertedIndexDescriptor::inverted_index_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index, index_id);
+ segment_index_file_names.push_back(index_file);
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_index_file_size_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file size from "
+ << get_segment_index_file_size_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ segment_index_file_sizes.push_back(segment_index_file_size);
+
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+ }
+ }
+
+ // Step 6.2: check data capacity
+ uint64_t total_index_size =
+ std::accumulate(segment_index_file_sizes.begin(),
segment_index_file_sizes.end(),
+ 0ULL); // NOLINT(bugprone-fold-init-type)
+ if (!local_tablet->can_add_binlog(total_index_size)) {
+ LOG(WARNING) << "failed to add binlog, no enough space,
total_index_size="
+ << total_index_size << ", tablet=" <<
local_tablet->tablet_id();
+ status = Status::InternalError("no enough space");
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ // Step 6.3: get all segment index files
+ DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
+ DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
+ for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
+ auto segment_index_file_size = segment_index_file_sizes[i];
+ auto get_segment_index_file_url = segment_index_file_urls[i];
+
+ uint64_t estimate_timeout =
+ segment_index_file_size /
config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ auto local_segment_index_path = segment_index_file_names[i];
+ LOG(INFO) << fmt::format("download segment index file from {} to {}",
+ get_segment_index_file_url,
local_segment_index_path);
+ auto get_segment_index_file_cb = [&get_segment_index_file_url,
&local_segment_index_path,
+ segment_index_file_size,
estimate_timeout,
+ &download_success_files](HttpClient*
client) {
+ RETURN_IF_ERROR(client->init(get_segment_index_file_url));
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download(local_segment_index_path));
+ download_success_files.push_back(local_segment_index_path);
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_index_file_size =
+ std::filesystem::file_size(local_segment_index_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download index file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to
{}",
+ local_segment_index_path, ec.message());
+ }
+ if (local_index_file_size != segment_index_file_size) {
+ LOG(WARNING) << "download index file length error"
+ << ", get_segment_index_file_url=" <<
get_segment_index_file_url
+ << ", index_file_size=" << segment_index_file_size
+ << ", local_index_file_size=" <<
local_index_file_size;
+ return Status::InternalError("downloaded index file size is
not equal");
+ }
+ return
io::global_local_filesystem()->permission(local_segment_index_path,
+
io::LocalFileSystem::PERMS_OWNER_RW);
+ };
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_index_file_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment index file from " <<
get_segment_index_file_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ }
+
+ // Step 7: create rowset && calculate delete bitmap && commit
+ // Step 7.1: create rowset
RowsetSharedPtr rowset;
status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
local_tablet->tablet_path(),
rowset_meta, &rowset);
@@ -298,7 +418,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
return;
}
- // Step 6.2 calculate delete bitmap before commit
+ // Step 7.2 calculate delete bitmap before commit
auto calc_delete_bitmap_token =
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(local_tablet_id);
@@ -334,7 +454,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
static_cast<void>(calc_delete_bitmap_token->wait());
}
- // Step 6.3: commit txn
+ // Step 7.3: commit txn
Status commit_txn_status =
StorageEngine::instance()->txn_manager()->commit_txn(
local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
rowset_meta->txn_id(), rowset_meta->tablet_id(),
local_tablet->schema_hash(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]