This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e47b3981235 branch-2.1: [chore](binlog) Add download binlog related
configs to BE #47412 (#47585)
e47b3981235 is described below
commit e47b39812350783816ed2512d7b457a2df83f260
Author: walter <[email protected]>
AuthorDate: Sat Feb 8 16:23:23 2025 +0800
branch-2.1: [chore](binlog) Add download binlog related configs to BE
#47412 (#47585)
cherry pick from #47412
---
be/src/common/config.cpp | 4 ++++
be/src/common/config.h | 4 ++++
be/src/runtime/snapshot_loader.cpp | 21 +++++++++++++------
be/src/service/backend_service.cpp | 42 ++++++++++++++++++++------------------
4 files changed, 45 insertions(+), 26 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f047071139e..64083aee9c1 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -241,6 +241,10 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
+// whether to check md5sum when download
+DEFINE_mBool(enable_download_md5sum_check, "true");
+// download binlog meta timeout, default 30s
+DEFINE_mInt32(download_binlog_meta_timeout_ms, "30000");
DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 29080a56def..9d08cc78562 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -287,6 +287,10 @@ DECLARE_mInt32(max_download_speed_kbps);
DECLARE_mInt32(download_low_speed_limit_kbps);
// download low speed time(seconds)
DECLARE_mInt32(download_low_speed_time);
+// whether to check md5sum when download
+DECLARE_mBool(enable_download_md5sum_check);
+// download binlog meta timeout
+DECLARE_mInt32(download_binlog_meta_timeout_ms);
// deprecated, use env var LOG_DIR in be.conf
DECLARE_String(sys_log_dir);
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index 639701f068c..5d8eae4ca80 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -38,6 +38,7 @@
#include <unordered_map>
#include <utility>
+#include "common/config.h"
#include "common/logging.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
@@ -419,9 +420,9 @@ Status SnapshotLoader::download(const std::map<std::string,
std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
- constexpr uint32_t kListRemoteFileTimeout = 15;
+ LOG(INFO) << fmt::format("begin to download snapshots via http. job: {},
task id: {}", _job_id,
+ _task_id);
constexpr uint32_t kDownloadFileMaxRetry = 3;
- constexpr uint32_t kGetLengthTimeout = 10;
// check if job has already been cancelled
int tmp_counter = 1;
@@ -502,7 +503,7 @@ Status SnapshotLoader::remote_http_download(
string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient*
client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
- client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&file_list_str);
};
RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry,
1, list_files_cb));
@@ -518,12 +519,20 @@ Status SnapshotLoader::remote_http_download(
uint64_t file_size = 0;
std::string file_md5;
auto get_file_stat_cb = [&remote_file_url, &file_size,
&file_md5](HttpClient* client) {
- std::string url = fmt::format("{}&acquire_md5=true",
remote_file_url);
+ int64_t timeout_ms = config::download_binlog_meta_timeout_ms;
+ std::string url = remote_file_url;
+ if (config::enable_download_md5sum_check) {
+ // compute md5sum is time-consuming, so we set a longer
timeout
+ timeout_ms = config::download_binlog_meta_timeout_ms * 3;
+ url = fmt::format("{}&acquire_md5=true", remote_file_url);
+ }
RETURN_IF_ERROR(client->init(url));
- client->set_timeout_ms(kGetLengthTimeout * 1000);
+ client->set_timeout_ms(timeout_ms);
RETURN_IF_ERROR(client->head());
RETURN_IF_ERROR(client->get_content_length(&file_size));
- RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&file_md5));
+ }
return Status::OK();
};
RETURN_IF_ERROR(
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index a2a04db4c53..9c8080ddaad 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -84,7 +84,7 @@ class TTransportException;
namespace doris {
namespace {
-constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+
struct IngestBinlogArg {
int64_t txn_id;
int64_t partition_id;
@@ -150,6 +150,14 @@ void _ingest_binlog(IngestBinlogArg* arg) {
tstatus.error_msgs.push_back(std::move(error_msg));
};
+ auto estimate_download_timeout = [](int64_t file_size) {
+ uint64_t estimate_timeout = file_size /
config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+ return estimate_timeout;
+ };
+
// Step 3: get binlog info
auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download",
request.remote_host,
request.remote_port);
@@ -161,7 +169,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
std::string binlog_info;
auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient*
client) {
RETURN_IF_ERROR(client->init(get_binlog_info_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&binlog_info);
};
auto status = HttpClient::execute_with_retry(max_retry, 1,
get_binlog_info_cb);
@@ -200,7 +208,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
std::string rowset_meta_str;
auto get_rowset_meta_cb = [&get_rowset_meta_url,
&rowset_meta_str](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_rowset_meta_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&rowset_meta_str);
};
status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
@@ -249,7 +257,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
auto get_segment_file_size_cb = [&get_segment_file_size_url,
&segment_file_size](HttpClient*
client) {
RETURN_IF_ERROR(client->init(get_segment_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+ client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return client->get_content_length(&segment_file_size);
};
@@ -285,16 +293,11 @@ void _ingest_binlog(IngestBinlogArg* arg) {
auto get_segment_file_url =
fmt::format("{}&acquire_md5=true",
segment_file_urls[segment_index]);
- uint64_t estimate_timeout =
- segment_file_size / config::download_low_speed_limit_kbps /
1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
auto local_segment_path = BetaRowset::segment_file_path(
local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index);
LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
local_segment_path);
+ uint64_t estimate_timeout =
estimate_download_timeout(segment_file_size);
auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
estimate_timeout,
&download_success_files](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_file_url));
@@ -303,7 +306,9 @@ void _ingest_binlog(IngestBinlogArg* arg) {
download_success_files.push_back(local_segment_path);
std::string remote_file_md5;
- RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ }
LOG(INFO) << "download segment file to " << local_segment_path
<< ", remote md5: " << remote_file_md5
<< ", remote size: " << segment_file_size;
@@ -378,7 +383,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return
client->get_content_length(&segment_index_file_size);
};
@@ -414,7 +419,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
[&get_segment_index_file_size_url,
&segment_index_file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
+
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
RETURN_IF_ERROR(client->head());
return
client->get_content_length(&segment_index_file_size);
};
@@ -461,12 +466,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
auto get_segment_index_file_url =
fmt::format("{}&acquire_md5=true", segment_index_file_urls[i]);
- uint64_t estimate_timeout =
- segment_index_file_size /
config::download_low_speed_limit_kbps / 1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
+ uint64_t estimate_timeout =
estimate_download_timeout(segment_index_file_size);
auto local_segment_index_path = segment_index_file_names[i];
LOG(INFO) << fmt::format("download segment index file from {} to {}",
get_segment_index_file_url,
local_segment_index_path);
@@ -479,7 +479,9 @@ void _ingest_binlog(IngestBinlogArg* arg) {
download_success_files.push_back(local_segment_index_path);
std::string remote_file_md5;
- RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ if (config::enable_download_md5sum_check) {
+ RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
+ }
std::error_code ec;
// Check file length
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]