This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a7935152e7b branch-3.0: [chore](binlog) Add download binlog related
configs to BE #47412 (#47587)
a7935152e7b is described below
commit a7935152e7b2ff8ccc2693f149da85926009b39a
Author: walter <[email protected]>
AuthorDate: Wed Feb 19 19:21:38 2025 +0800
branch-3.0: [chore](binlog) Add download binlog related configs to BE
#47412 (#47587)
cherry pick from #47412
---
be/src/common/config.cpp | 4 ++++
be/src/common/config.h | 4 ++++
be/src/runtime/snapshot_loader.cpp | 22 ++++++++++++++------
be/src/service/backend_service.cpp | 42 ++++++++++++++++++++------------------
4 files changed, 46 insertions(+), 26 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f2e4ed95640..7b0079f0ce6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -256,6 +256,10 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
DEFINE_mInt32(download_low_speed_time, "300");
// whether to download small files in batch
DEFINE_mBool(enable_batch_download, "false");
+// whether to check md5sum when download
+DEFINE_mBool(enable_download_md5sum_check, "true");
+// download binlog meta timeout, default 30s
+DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000");
DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 64777a532a4..7558b5286d9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -302,6 +302,10 @@ DECLARE_mInt32(download_low_speed_limit_kbps);
DECLARE_mInt32(download_low_speed_time);
// whether to download small files in batch.
DECLARE_mBool(enable_batch_download);
+// whether to check md5sum when download
+DECLARE_mBool(enable_download_md5sum_check);
+// download binlog meta timeout
+DECLARE_mInt32(download_binlog_meta_timeout_ms);
// deprecated, use env var LOG_DIR in be.conf
DECLARE_String(sys_log_dir);
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index 97955d94051..d95bbb7f792 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -35,6 +35,7 @@
#include <unordered_map>
#include <utility>
+#include "common/config.h"
#include "common/logging.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
@@ -412,9 +413,10 @@ Status SnapshotLoader::download(const
std::map<std::string, std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
- constexpr uint32_t kListRemoteFileTimeout = 15;
+ LOG(INFO) << fmt::format("begin to download snapshots via http. job: {},
task id: {}", _job_id,
+ _task_id);
+
constexpr uint32_t kDownloadFileMaxRetry = 3;
- constexpr uint32_t kGetLengthTimeout = 10;
// check if job has already been cancelled
int tmp_counter = 1;
@@ -497,7 +499,7 @@ Status SnapshotLoader::remote_http_download(
string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient*
client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
- client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&file_list_str);
};
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry,
1, list_files_cb));
@@ -513,12 +515,20 @@ Status SnapshotLoader::remote_http_download(
uint64_t file_size = 0;
std::string file_md5;
auto get_file_stat_cb = [&remote_file_url, &file_size,
&file_md5](HttpClient* client) {
- std::string url = fmt::format("{}&acquire_md5=true",
remote_file_url);
+ int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
+ std::string url = remote_file_url;
+ if (config::enable_download_md5sum_check) {
+ // compute md5sum is time-consuming, so we set a longer
timeout
+ timeout_ms = config::download_binlog_meta_timeout_ms * 3;
+ url = fmt::format("{}&acquire_md5=true", remote_file_url);
+ }
RETURN_IF_ERROR(client->init(url));
- client->set_timeout_ms(kGetLengthTimeout * 1000);
+ client->set_timeout_ms(timeout_ms);
RETURN_IF_ERROR(client->head());
RETURN_IF_ERROR(client->get_content_length(&file_size));
- RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+ }
return Status::OK();
};
RETURN_IF_ERROR(
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 6bb73f37b8c..817f7ffb914 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -90,7 +90,7 @@ class TTransportException;
namespace doris {
namespace {
-constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+
struct IngestBinlogArg {
int64_t txn_id;
int64_t partition_id;
@@ -156,6 +156,14 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
tstatus.error_msgs.push_back(std::move(error_msg));
};
+ auto estimate_download_timeout = [](int64_t file_size) {
+ uint64_t estimate_timeout = file_size /
config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+ return estimate_timeout;
+ };
+
// Step 3: get binlog info
auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download",
request.remote_host,
request.remote_port);
@@ -167,7 +175,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
std::string binlog_info;
auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient*
client) {
RETURN_IF_ERROR(client->init(get_binlog_info_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&binlog_info);
};
auto status = HttpClient::execute_with_retry(max_retry, 1,
get_binlog_info_cb);
@@ -206,7 +214,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
std::string rowset_meta_str;
auto get_rowset_meta_cb = [&get_rowset_meta_url,
&rowset_meta_str](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_rowset_meta_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&rowset_meta_str);
};
status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
@@ -255,7 +263,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
auto get_segment_file_size_cb = [&get_segment_file_size_url,
&segment_file_size](HttpClient*
client) {
RETURN_IF_ERROR(client->init(get_segment_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return client->get_content_length(&segment_file_size);
};
@@ -291,16 +299,11 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
auto get_segment_file_url =
fmt::format("{}&acquire_md5=true",
segment_file_urls[segment_index]);
- uint64_t estimate_timeout =
- segment_file_size / config::download_low_speed_limit_kbps /
1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
auto segment_path = local_segment_path(local_tablet->tablet_path(),
rowset_meta->rowset_id().to_string(), segment_index);
LOG(INFO) << "download segment file from " << get_segment_file_url <<
" to "
<< segment_path;
+ uint64_t estimate_timeout =
estimate_download_timeout(segment_file_size);
auto get_segment_file_cb = [&get_segment_file_url, &segment_path,
segment_file_size,
estimate_timeout,
&download_success_files](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_file_url));
@@ -309,7 +312,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
download_success_files.push_back(segment_path);
std::string remote_file_md5;
- RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ }
LOG(INFO) << "download segment file to " << segment_path
<< ", remote md5: " << remote_file_md5
<< ", remote size: " << segment_file_size;
@@ -381,7 +386,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return
client->get_content_length(&segment_index_file_size);
};
@@ -420,7 +425,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return
client->get_content_length(&segment_index_file_size);
};
@@ -468,12 +473,7 @@ void _ingest_binlog(StorageEngine& engine,
IngestBinlogArg* arg) {
auto get_segment_index_file_url =
fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);
- uint64_t estimate_timeout =
- segment_index_file_size /
config::download_low_speed_limit_kbps / 1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
+ uint64_t estimate_timeout =
estimate_download_timeout(segment_index_file_size);
auto local_segment_index_path = segment_index_file_names[i];
LOG(INFO) << fmt::format("download segment index file from {} to {}",
get_segment_index_file_url,
local_segment_index_path);
@@ -486,7 +486,9 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
download_success_files.push_back(local_segment_index_path);
std::string remote_file_md5;
- RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ }
std::error_code ec;
// Check file length
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]