This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4100a752379 branch-3.0: [feat](clone) Speed clone tablet via batch
small file downloading #45061 (#45191)
4100a752379 is described below
commit 4100a752379b316dbe011287f25c743613a56fb8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 10 12:31:04 2024 +0800
branch-3.0: [feat](clone) Speed clone tablet via batch small file
downloading #45061 (#45191)
Cherry-picked from #45061
Co-authored-by: walter <[email protected]>
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/gutil/strings/stringpiece.h | 6 +
be/src/http/action/batch_download_action.cpp | 216 ++++++++++++++++++++++
be/src/http/action/batch_download_action.h | 65 +++++++
be/src/http/action/download_binlog_action.cpp | 3 -
be/src/http/http_channel.cpp | 56 +++++-
be/src/http/http_channel.h | 8 +
be/src/http/http_client.cpp | 247 ++++++++++++++++++++++++--
be/src/http/http_client.h | 5 +
be/src/http/utils.cpp | 130 +++++++++++++-
be/src/http/utils.h | 15 +-
be/src/olap/task/engine_clone_task.cpp | 149 ++++++++++++++--
be/src/olap/task/engine_clone_task.h | 3 +
be/src/service/http_service.cpp | 11 ++
be/test/http/http_client_test.cpp | 98 +++++++++-
16 files changed, 975 insertions(+), 41 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1ecd668e428..a1d168e9802 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -232,6 +232,8 @@ DEFINE_mInt32(max_download_speed_kbps, "50000");
DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
+// whether to download small files in batch
+DEFINE_mBool(enable_batch_download, "false");
DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 10ac38d18bb..95b04b56a5c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -282,6 +282,8 @@ DECLARE_mInt32(max_download_speed_kbps);
DECLARE_mInt32(download_low_speed_limit_kbps);
// download low speed time(seconds)
DECLARE_mInt32(download_low_speed_time);
+// whether to download small files in batch.
+DECLARE_mBool(enable_batch_download);
// deprecated, use env var LOG_DIR in be.conf
DECLARE_String(sys_log_dir);
diff --git a/be/src/gutil/strings/stringpiece.h
b/be/src/gutil/strings/stringpiece.h
index 38e36a27099..7a4ebabbf09 100644
--- a/be/src/gutil/strings/stringpiece.h
+++ b/be/src/gutil/strings/stringpiece.h
@@ -149,6 +149,12 @@ public:
assert(length <= static_cast<size_t>(std::numeric_limits<int>::max()));
length_ = static_cast<int>(length);
}
+ StringPiece(std::string_view view) // NOLINT(runtime/explicit)
+ : ptr_(view.data()), length_(0) {
+ size_t length = view.size();
+ assert(length <= static_cast<size_t>(std::numeric_limits<int>::max()));
+ length_ = static_cast<int>(length);
+ }
StringPiece(const char* offset, int len) : ptr_(offset), length_(len) {
assert(len >= 0); }
// Substring of another StringPiece.
diff --git a/be/src/http/action/batch_download_action.cpp
b/be/src/http/action/batch_download_action.cpp
new file mode 100644
index 00000000000..d486883e90b
--- /dev/null
+++ b/be/src/http/action/batch_download_action.cpp
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/batch_download_action.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "gutil/strings/split.h"
+#include "http/http_channel.h"
+#include "http/http_method.h"
+#include "http/http_request.h"
+#include "http/utils.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "util/security.h"
+
+namespace doris {
+namespace {
+const std::string CHECK_PARAMETER = "check";
+const std::string LIST_PARAMETER = "list";
+const std::string DIR_PARAMETER = "dir";
+const std::string TOKEN_PARAMETER = "token";
+} // namespace
+
+BatchDownloadAction::BatchDownloadAction(
+ ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group,
+ const std::vector<std::string>& allow_dirs)
+ : HttpHandlerWithAuth(exec_env),
_rate_limit_group(std::move(rate_limit_group)) {
+ for (const auto& dir : allow_dirs) {
+ std::string p;
+ Status st = io::global_local_filesystem()->canonicalize(dir, &p);
+ if (!st.ok()) {
+ continue;
+ }
+ _allow_paths.emplace_back(std::move(p));
+ }
+}
+
+void BatchDownloadAction::handle(HttpRequest* req) {
+ if (VLOG_CRITICAL_IS_ON) {
+ VLOG_CRITICAL << "accept one batch download request " <<
req->debug_string();
+ }
+
+ if (req->param(CHECK_PARAMETER) == "true") {
+ // For API support check
+ HttpChannel::send_reply(req, "OK");
+ return;
+ }
+
+ // Get 'dir' parameter, then assembly file absolute path
+ const std::string& dir_path = req->param(DIR_PARAMETER);
+ if (dir_path.empty()) {
+ std::string error_msg =
+ std::string("parameter " + DIR_PARAMETER + " not specified in
url.");
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+ return;
+ }
+
+ if (dir_path.find("..") != std::string::npos) {
+ std::string error_msg = "Not allowed to read relative path: " +
dir_path;
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg);
+ return;
+ }
+
+ Status status;
+ if (config::enable_token_check) {
+ status = _check_token(req);
+ if (!status.ok()) {
+ std::string error_msg = status.to_string();
+ if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
+ HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED,
error_msg);
+ return;
+ } else {
+ HttpChannel::send_reply(req,
HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
+ return;
+ }
+ }
+ }
+
+ status = _check_path_is_allowed(dir_path);
+ if (!status.ok()) {
+ std::string error_msg = status.to_string();
+ if (status.is<ErrorCode::NOT_FOUND>() ||
status.is<ErrorCode::IO_ERROR>()) {
+ HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
+ return;
+ } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
+ HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
+ return;
+ } else {
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
error_msg);
+ return;
+ }
+ }
+
+ bool is_dir = false;
+ status = io::global_local_filesystem()->is_directory(dir_path, &is_dir);
+ if (!status.ok()) {
+ LOG(WARNING) << "handle batch download request: " << status.to_string()
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
status.to_string());
+ return;
+ }
+
+ if (!is_dir) {
+ std::string error_msg = fmt::format("The requested path is not a
directory: {}", dir_path);
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+ return;
+ }
+
+ _handle(req, dir_path);
+
+ VLOG_CRITICAL << "deal with batch download request finished! ";
+}
+
+void BatchDownloadAction::_handle(HttpRequest* req, const std::string&
dir_path) {
+ bool is_list_request = req->param(LIST_PARAMETER) == "true";
+ if (is_list_request) {
+ // return the list of files in the specified directory
+ bool is_acquire_filesize = true;
+ do_dir_response(dir_path, req, is_acquire_filesize);
+ } else {
+ _handle_batch_download(req, dir_path);
+ }
+}
+
+void BatchDownloadAction::_handle_batch_download(HttpRequest* req, const
std::string& dir_path) {
+ std::vector<std::string> files =
+ strings::Split(req->get_request_body(), "\n",
strings::SkipWhitespace());
+ if (files.empty()) {
+ std::string error_msg = "No file specified in request body.";
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+ return;
+ }
+
+ if (files.size() > 64) {
+ std::string error_msg =
+ "The number of files to download in a batch should be less
than 64.";
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, error_msg);
+ return;
+ }
+
+ for (const auto& file : files) {
+ if (file.find('/') != std::string::npos) {
+ std::string error_msg =
+ fmt::format("Not allowed to read relative path: {}, dir:
{}", file, dir_path);
+ LOG(WARNING) << "handle batch download request: " << error_msg
+ << ", url: " << mask_token(req->uri());
+ HttpChannel::send_reply(req, HttpStatus::FORBIDDEN, error_msg);
+ return;
+ }
+ }
+
+ HttpChannel::send_files(req, dir_path, std::move(files));
+}
+
+Status BatchDownloadAction::_check_token(HttpRequest* req) {
+ const std::string& token_str = req->param(TOKEN_PARAMETER);
+ if (token_str.empty()) {
+ LOG(WARNING) << "token is not specified in request. url: " <<
mask_token(req->uri());
+ return Status::NotAuthorized("token is not specified.");
+ }
+
+ const std::string& local_token = _exec_env->token();
+ if (token_str != local_token) {
+ LOG(WARNING) << "invalid download token: " << mask_token(token_str)
+ << ", local token: " << mask_token(local_token)
+ << ", url: " << mask_token(req->uri());
+ return Status::NotAuthorized("invalid token {}",
mask_token(token_str));
+ }
+
+ return Status::OK();
+}
+
+Status BatchDownloadAction::_check_path_is_allowed(const std::string&
file_path) {
+ std::string canonical_file_path;
+ RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path,
&canonical_file_path));
+ for (auto& allow_path : _allow_paths) {
+ if (io::LocalFileSystem::contain_path(allow_path,
canonical_file_path)) {
+ return Status::OK();
+ }
+ }
+
+ return Status::NotAuthorized("file path is not allowed: {}",
canonical_file_path);
+}
+
+} // end namespace doris
diff --git a/be/src/http/action/batch_download_action.h
b/be/src/http/action/batch_download_action.h
new file mode 100644
index 00000000000..f0b7e3576b9
--- /dev/null
+++ b/be/src/http/action/batch_download_action.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "http/http_handler.h"
+#include "http/http_handler_with_auth.h"
+#include "util/threadpool.h"
+
+struct bufferevent_rate_limit_group;
+
+namespace doris {
+
+class ExecEnv;
+class HttpRequest;
+
+// A simple handler that serves incoming HTTP requests of batching
file-download to send their
+// respective HTTP responses.
+//
+// We use parameter named 'dir' to specify the static resource path, it is an
absolute path.
+//
+// In HEAD request, then this handler will return the list of files in the
specified directory.
+//
+// In GET request, the file names to download are specified in the request
body as a list of strings,
+// separated by '\n'. To avoid cost resource, the maximum number of files to
download in a batch is 64.
+class BatchDownloadAction : public HttpHandlerWithAuth {
+public:
+ BatchDownloadAction(ExecEnv* exec_env,
+ std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group,
+ const std::vector<std::string>& allow_dirs);
+
+ ~BatchDownloadAction() override = default;
+
+ void handle(HttpRequest* req) override;
+
+private:
+ Status _check_token(HttpRequest* req);
+ Status _check_path_is_allowed(const std::string& path);
+
+ void _handle(HttpRequest* req, const std::string& dir_path);
+ void _handle_batch_download(HttpRequest* req, const std::string& dir_path);
+
+ std::vector<std::string> _allow_paths;
+ std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group;
+};
+
+} // end namespace doris
diff --git a/be/src/http/action/download_binlog_action.cpp
b/be/src/http/action/download_binlog_action.cpp
index 54701c5e463..372f840401c 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -21,11 +21,9 @@
#include <fmt/ranges.h>
#include <cstdint>
-#include <limits>
#include <stdexcept>
#include <string_view>
#include <utility>
-#include <vector>
#include "common/config.h"
#include "common/logging.h"
@@ -34,7 +32,6 @@
#include "http/utils.h"
#include "io/fs/local_file_system.h"
#include "olap/storage_engine.h"
-#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 96679195316..312f1ab9286 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -20,8 +20,8 @@
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>
+#include <event2/http_struct.h>
-#include <algorithm>
#include <sstream>
#include <string>
#include <vector>
@@ -57,7 +57,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus
status) {
}
void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const
std::string& content) {
- auto evb = evbuffer_new();
+ auto* evb = evbuffer_new();
std::string compressed_content;
if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING),
content,
&compressed_content)) {
@@ -72,7 +72,7 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus
status, const std:
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t
size,
bufferevent_rate_limit_group* rate_limit_group) {
- auto evb = evbuffer_new();
+ auto* evb = evbuffer_new();
evbuffer_add_file(evb, fd, off, size);
auto* evhttp_request = request->get_evhttp_request();
if (rate_limit_group) {
@@ -84,6 +84,56 @@ void HttpChannel::send_file(HttpRequest* request, int fd,
size_t off, size_t siz
evbuffer_free(evb);
}
+void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir,
+ std::vector<std::string> local_files,
+ bufferevent_rate_limit_group* rate_limit_group) {
+ if (rate_limit_group) {
+ auto* evhttp_request = request->get_evhttp_request();
+ auto* evhttp_connection =
evhttp_request_get_connection(evhttp_request);
+ auto* buffer_event =
evhttp_connection_get_bufferevent(evhttp_connection);
+ bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
+ }
+
+ send_files(request, root_dir, std::move(local_files));
+}
+
+void HttpChannel::send_files(HttpRequest* request, const std::string& root_dir,
+ std::vector<std::string> local_files) {
+ std::unique_ptr<evbuffer, decltype(&evbuffer_free)> evb(evbuffer_new(),
&evbuffer_free);
+ for (const std::string& file : local_files) {
+ std::string file_path = fmt::format("{}/{}", root_dir, file);
+ int fd = open(file_path.c_str(), O_RDONLY);
+ if (fd < 0) {
+ std::string error_msg = "Failed to open file: " + file_path;
+ LOG(WARNING) << "http channel send files: " << error_msg;
+ HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg);
+ return;
+ }
+ struct stat st;
+ auto res = fstat(fd, &st);
+ if (res < 0) {
+ close(fd);
+ std::string error_msg = "Failed to open file: " + file_path;
+ LOG(WARNING) << "http channel send files: " << error_msg;
+ HttpChannel::send_reply(request, HttpStatus::NOT_FOUND, error_msg);
+ return;
+ }
+
+ int64_t file_size = st.st_size;
+ VLOG_DEBUG << "http channel send file " << file_path << ", size: " <<
file_size;
+
+ evbuffer_add_printf(evb.get(), "File-Name: %s\r\n", file.c_str());
+ evbuffer_add_printf(evb.get(), "Content-Length: %ld\r\n", file_size);
+ evbuffer_add_printf(evb.get(), "\r\n");
+ if (file_size > 0) {
+ evbuffer_add_file(evb.get(), fd, 0, file_size);
+ }
+ }
+
+ evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
+ default_reason(HttpStatus::OK).c_str(), evb.get());
+}
+
bool HttpChannel::compress_content(const std::string& accept_encoding, const
std::string& input,
std::string* output) {
// Don't bother compressing empty content.
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index ee1e6c0888f..0d5e5d4260a 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -20,6 +20,7 @@
#include <stddef.h>
#include <string>
+#include <vector>
#include "http/http_status.h"
@@ -47,6 +48,13 @@ public:
static void send_file(HttpRequest* request, int fd, size_t off, size_t
size,
bufferevent_rate_limit_group* rate_limit_group =
nullptr);
+ static void send_files(HttpRequest* request, const std::string& root_dir,
+ std::vector<std::string> local_files,
+ bufferevent_rate_limit_group* rate_limit_group);
+
+ static void send_files(HttpRequest* request, const std::string& root_dir,
+ std::vector<std::string> local_files);
+
static bool compress_content(const std::string& accept_encoding, const
std::string& input,
std::string* output);
};
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index fc4c997fce8..767377cea3f 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -24,14 +24,225 @@
#include <ostream>
#include "common/config.h"
+#include "common/status.h"
#include "http/http_headers.h"
-#include "http/http_status.h"
#include "runtime/exec_env.h"
#include "util/security.h"
#include "util/stack_util.h"
namespace doris {
+class MultiFileSplitter {
+public:
+ MultiFileSplitter(std::string local_dir, std::unordered_set<std::string>
expected_files)
+ : _local_dir_path(std::move(local_dir)),
_expected_files(std::move(expected_files)) {}
+ ~MultiFileSplitter() {
+ if (_fd >= 0) {
+ close(_fd);
+ }
+
+ if (!_status.ok() && !downloaded_files.empty()) {
+ LOG(WARNING) << "download files to " << _local_dir_path << "
failed, try remove the "
+ << downloaded_files.size() << " downloaded files";
+ for (const auto& file : downloaded_files) {
+ remove(file.c_str());
+ }
+ }
+ }
+
+ bool append(const char* data, size_t length) {
+ // Already failed.
+ if (!_status.ok()) {
+ return false;
+ }
+
+ std::string buf;
+ if (!_buffer.empty()) {
+ buf.swap(_buffer);
+ buf.append(data, length);
+ data = buf.data();
+ length = buf.size();
+ }
+ return append_inner(data, length);
+ }
+
+ Status finish() {
+ if (_status.ok()) {
+ _status = finish_inner();
+ }
+
+ return _status;
+ }
+
+private:
+ bool append_inner(const char* data, size_t length) {
+ while (length > 0) {
+ int consumed = 0;
+ if (_is_reading_header) {
+ consumed = parse_header(data, length);
+ } else {
+ consumed = append_file(data, length);
+ }
+
+ if (consumed < 0) {
+ return false;
+ }
+
+ DCHECK(consumed <= length);
+ data += consumed;
+ length -= consumed;
+ }
+ return true;
+ }
+
+ int parse_header(const char* data, size_t length) {
+ DCHECK(_fd < 0);
+
+ std::string_view buf(data, length);
+ size_t pos = buf.find("\r\n\r\n");
+ if (pos == std::string::npos) {
+ _buffer.append(data, length);
+ return static_cast<int>(length);
+ }
+
+ // header already read.
+ _is_reading_header = false;
+
+ bool has_file_name = false;
+ bool has_file_size = false;
+ std::string_view header = buf.substr(0, pos);
+ std::vector<std::string> headers =
+ strings::Split(header, "\r\n", strings::SkipWhitespace());
+ for (auto& s : headers) {
+ size_t header_pos = s.find(':');
+ if (header_pos == std::string::npos) {
+ continue;
+ }
+ std::string_view header_view(s);
+ std::string_view key = header_view.substr(0, header_pos);
+ std::string_view value = header_view.substr(header_pos + 1);
+ if (value.starts_with(' ')) {
+ value.remove_prefix(std::min(value.find_first_not_of(' '),
value.size()));
+ }
+ if (key == "File-Name") {
+ _file_name = value;
+ has_file_name = true;
+ } else if (key == "Content-Length") {
+ auto res = std::from_chars(value.data(), value.data() +
value.size(), _file_size);
+ if (res.ec != std::errc()) {
+ std::string error_msg = fmt::format("invalid content
length: {}", value);
+ LOG(WARNING) << "download files to " << _local_dir_path
+ << "failed, err=" << error_msg;
+ _status = Status::HttpError(std::move(error_msg));
+ return -1;
+ }
+ has_file_size = true;
+ }
+ }
+
+ if (!has_file_name || !has_file_size) {
+ std::string error_msg =
+ fmt::format("invalid multi part header, has file name: {},
has file size: {}",
+ has_file_name, has_file_size);
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << error_msg;
+ _status = Status::HttpError(std::move(error_msg));
+ return -1;
+ }
+
+ if (!_expected_files.contains(_file_name)) {
+ std::string error_msg = fmt::format("unexpected file: {}",
_file_name);
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << error_msg;
+ _status = Status::HttpError(std::move(error_msg));
+ return -1;
+ }
+
+ VLOG_DEBUG << "receive file " << _file_name << ", size " << _file_size;
+
+ _written_size = 0;
+ _local_file_path = fmt::format("{}/{}", _local_dir_path, _file_name);
+ _fd = open(_local_file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
0644);
+ if (_fd < 0) {
+ std::string error_msg = "fail to open file to write: " +
_local_file_path;
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << error_msg;
+ _status = Status::IOError(std::move(error_msg));
+ return -1;
+ }
+ downloaded_files.push_back(_local_file_path);
+
+ return static_cast<int>(pos + 4);
+ }
+
+ int append_file(const char* data, size_t length) {
+ DCHECK(_fd >= 0);
+ DCHECK(_file_size >= _written_size);
+
+ size_t write_size = std::min(length, _file_size - _written_size);
+ if (write_size > 0 && write(_fd, data, write_size) < 0) {
+ auto msg = fmt::format("write file failed, file={}, error={}",
_local_file_path,
+ strerror(errno));
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << msg;
+ _status = Status::HttpError(std::move(msg));
+ return -1;
+ }
+
+ _written_size += write_size;
+ if (_written_size == _file_size) {
+ // This file has been downloaded, switch to the next one.
+ switchToNextFile();
+ }
+
+ return write_size;
+ }
+
+ Status finish_inner() {
+ if (!_is_reading_header && _written_size == _file_size) {
+ switchToNextFile();
+ }
+
+ if (_fd >= 0) {
+ // This file is not completely downloaded.
+ close(_fd);
+ _fd = -1;
+ auto error_msg = fmt::format("file {} is not completely
downloaded", _local_file_path);
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << error_msg;
+ return Status::HttpError(std::move(error_msg));
+ }
+
+ if (!_expected_files.empty()) {
+ auto error_msg = fmt::format("not all files are downloaded, {}
missing files",
+ _expected_files.size());
+ LOG(WARNING) << "download files to " << _local_dir_path <<
"failed, err=" << error_msg;
+ return Status::HttpError(std::move(error_msg));
+ }
+
+ downloaded_files.clear();
+ return Status::OK();
+ }
+
+ void switchToNextFile() {
+ DCHECK(_fd >= 0);
+ DCHECK(_written_size == _file_size);
+
+ close(_fd);
+ _fd = -1;
+ _expected_files.erase(_file_name);
+ _is_reading_header = true;
+ }
+
+ const std::string _local_dir_path;
+ std::string _buffer;
+ std::unordered_set<std::string> _expected_files;
+ Status _status;
+
+ bool _is_reading_header = true;
+ int _fd = -1;
+ std::string _local_file_path;
+ std::string _file_name;
+ size_t _file_size = 0;
+ size_t _written_size = 0;
+ std::vector<std::string> downloaded_files;
+};
+
static const char* header_error_msg(CURLHcode code) {
switch (code) {
case CURLHE_OK:
@@ -174,6 +385,12 @@ void HttpClient::set_method(HttpMethod method) {
}
}
+void HttpClient::set_speed_limit() {
+ curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT,
config::download_low_speed_limit_kbps * 1024);
+ curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME,
config::download_low_speed_time);
+ curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE,
config::max_download_speed_kbps * 1024);
+}
+
size_t HttpClient::on_response_data(const void* data, size_t length) {
if (*_callback != nullptr) {
bool is_continue = (*_callback)(data, length);
@@ -184,12 +401,6 @@ size_t HttpClient::on_response_data(const void* data,
size_t length) {
return length;
}
-// Status HttpClient::execute_post_request(const std::string& post_data, const
std::function<bool(const void* data, size_t length)>& callback = {}) {
-// _callback = &callback;
-// set_post_body(post_data);
-// return execute(callback);
-// }
-
Status HttpClient::execute_post_request(const std::string& payload,
std::string* response) {
set_method(POST);
set_payload(payload);
@@ -234,14 +445,8 @@ Status HttpClient::get_content_md5(std::string* md5) const
{
}
Status HttpClient::download(const std::string& local_path) {
- // set method to GET
set_method(GET);
-
- // TODO(zc) Move this download speed limit outside to limit download speed
- // at system level
- curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_LIMIT,
config::download_low_speed_limit_kbps * 1024);
- curl_easy_setopt(_curl, CURLOPT_LOW_SPEED_TIME,
config::download_low_speed_time);
- curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE,
config::max_download_speed_kbps * 1024);
+ set_speed_limit();
auto fp_closer = [](FILE* fp) { fclose(fp); };
std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(local_path.c_str(),
"w"), fp_closer);
@@ -270,6 +475,20 @@ Status HttpClient::download(const std::string& local_path)
{
return status;
}
+Status HttpClient::download_multi_files(const std::string& local_dir,
+ const std::unordered_set<std::string>&
expected_files) {
+ set_speed_limit();
+
+ MultiFileSplitter splitter(local_dir, expected_files);
+ auto callback = [&](const void* data, size_t length) {
+ return splitter.append(reinterpret_cast<const char*>(data), length);
+ };
+ if (auto s = execute(callback); !s.ok()) {
+ return s;
+ }
+ return splitter.finish();
+}
+
Status HttpClient::execute(std::string* response) {
auto callback = [response](const void* data, size_t length) {
response->append((char*)data, length);
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index c0c8863a9b0..a6f2f4fdff5 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -24,6 +24,7 @@
#include <cstdio>
#include <functional>
#include <string>
+#include <unordered_set>
#include "common/status.h"
#include "http/http_headers.h"
@@ -81,6 +82,8 @@ public:
curl_easy_setopt(_curl, CURLOPT_SSL_VERIFYHOST, 0L);
}
+ void set_speed_limit();
+
// TODO(zc): support set header
// void set_header(const std::string& key, const std::string& value) {
// _cntl.http_request().SetHeader(key, value);
@@ -141,6 +144,8 @@ public:
// helper function to download a file, you can call this function to
download
// a file to local_path
Status download(const std::string& local_path);
+ Status download_multi_files(const std::string& local_dir,
+ const std::unordered_set<std::string>&
expected_files);
Status execute_post_request(const std::string& payload, std::string*
response);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index f91610476b4..ee7a78113e5 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -23,6 +23,8 @@
#include <unistd.h>
#include <ostream>
+#include <string>
+#include <unordered_map>
#include <vector>
#include "common/config.h"
@@ -30,6 +32,7 @@
#include "common/status.h"
#include "common/utils.h"
#include "http/http_channel.h"
+#include "http/http_client.h"
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/http_method.h"
@@ -41,10 +44,15 @@
#include "runtime/exec_env.h"
#include "util/md5.h"
#include "util/path_util.h"
+#include "util/security.h"
#include "util/url_coding.h"
namespace doris {
+const uint32_t CHECK_SUPPORT_TIMEOUT = 3;
+const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
+const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
+
std::string encode_basic_auth(const std::string& user, const std::string&
passwd) {
std::string auth = user + ":" + passwd;
std::string encoded_auth;
@@ -190,20 +198,26 @@ void do_file_response(const std::string& file_path,
HttpRequest* req,
HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
}
-void do_dir_response(const std::string& dir_path, HttpRequest* req) {
+void do_dir_response(const std::string& dir_path, HttpRequest* req, bool
is_acquire_filesize) {
bool exists = true;
std::vector<io::FileInfo> files;
Status st = io::global_local_filesystem()->list(dir_path, true, &files,
&exists);
if (!st.ok()) {
LOG(WARNING) << "Failed to scan dir. " << st;
HttpChannel::send_error(req, HttpStatus::INTERNAL_SERVER_ERROR);
+ return;
}
+ VLOG_DEBUG << "list dir: " << dir_path << ", file count: " << files.size();
+
const std::string FILE_DELIMITER_IN_DIR_RESPONSE = "\n";
std::stringstream result;
for (auto& file : files) {
result << file.file_name << FILE_DELIMITER_IN_DIR_RESPONSE;
+ if (is_acquire_filesize) {
+ result << file.file_size << FILE_DELIMITER_IN_DIR_RESPONSE;
+ }
}
std::string result_str = result.str();
@@ -221,4 +235,118 @@ bool load_size_smaller_than_wal_limit(int64_t
content_length) {
return (content_length < 0.8 * max_available_size);
}
+Status is_support_batch_download(const std::string& endpoint) {
+ std::string url =
fmt::format("http://{}/api/_tablet/_batch_download?check=true", endpoint);
+ auto check_support_cb = [&url](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(url));
+ client->set_timeout_ms(CHECK_SUPPORT_TIMEOUT * 1000);
+ client->set_method(HttpMethod::HEAD);
+ std::string response;
+ return client->execute(&response);
+ };
+ return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1,
check_support_cb);
+}
+
+Status list_remote_files_v2(const std::string& address, const std::string&
token,
+ const std::string& remote_dir,
+ std::vector<std::pair<std::string, size_t>>*
file_info_list) {
+ std::string remote_url =
+
fmt::format("http://{}/api/_tablet/_batch_download?token={}&dir={}&list=true",
address,
+ token, remote_dir);
+
+ std::string file_list_str;
+ auto list_files_cb = [&](HttpClient* client) {
+ file_list_str.clear();
+ RETURN_IF_ERROR(client->init(remote_url, false));
+ client->set_method(HttpMethod::GET);
+ client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000);
+ return client->execute(&file_list_str);
+ };
+ Status status = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1,
list_files_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to list remote files from " << remote_url
+ << ", status: " << status.to_string() << ", response: "
<< file_list_str;
+ return status;
+ }
+
+ std::vector<string> file_list = strings::Split(file_list_str, "\n",
strings::SkipWhitespace());
+ if (file_list.size() % 2 != 0) {
+ return Status::InternalError("batch download files: invalid file list,
size is not even");
+ }
+
+ VLOG_DEBUG << "list remote files from " << remote_url
+ << ", file count: " << file_list.size() / 2;
+
+ for (size_t i = 0; i < file_list.size(); i += 2) {
+ uint64_t file_size = 0;
+ try {
+ file_size = std::stoull(file_list[i + 1]);
+ } catch (std::exception&) {
+ return Status::InternalError("batch download files: invalid file
size format: " +
+ file_list[i + 1]);
+ }
+ file_info_list->emplace_back(std::move(file_list[i]), file_size);
+ }
+
+ return Status::OK();
+}
+
+Status download_files_v2(const std::string& address, const std::string& token,
+ const std::string& remote_dir, const std::string&
local_dir,
+ const std::vector<std::pair<std::string, size_t>>&
file_info_list) {
+ std::string remote_url =
fmt::format("http://{}/api/_tablet/_batch_download?dir={}&token={}",
+ address, remote_dir, token);
+
+ size_t batch_file_size = 0;
+ std::unordered_set<std::string> expected_files;
+ std::stringstream ss;
+ for (const auto& file_info : file_info_list) {
+ ss << file_info.first << "\n";
+ batch_file_size += file_info.second;
+ expected_files.insert(file_info.first);
+ }
+ std::string payload = ss.str();
+
+ uint64_t estimate_timeout = batch_file_size /
config::download_low_speed_limit_kbps / 1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ LOG(INFO) << "begin to download files from " << remote_url << " to " <<
local_dir
+ << ", file count: " << file_info_list.size() << ", total size: "
<< batch_file_size
+ << ", timeout: " << estimate_timeout;
+
+ auto callback = [&](HttpClient* client) -> Status {
+ RETURN_IF_ERROR(client->init(remote_url, false));
+ client->set_method(HttpMethod::POST);
+ client->set_payload(payload);
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download_multi_files(local_dir,
expected_files));
+ for (auto&& [file_name, file_size] : file_info_list) {
+ std::string local_file_path = local_dir + "/" + file_name;
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_file_size =
std::filesystem::file_size(local_file_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download file error: " << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to
{}", local_file_path,
+ ec.message());
+ }
+ if (local_file_size != file_size) {
+ LOG(WARNING) << "download file length error"
+ << ", remote_path=" << mask_token(remote_url)
+ << ", file_name=" << file_name << ", file_size="
<< file_size
+ << ", local_file_size=" << local_file_size;
+ return Status::InternalError("downloaded file size is not
equal");
+ }
+ RETURN_IF_ERROR(io::global_local_filesystem()->permission(
+ local_file_path, io::LocalFileSystem::PERMS_OWNER_RW));
+ }
+
+ return Status::OK();
+ };
+ return HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1,
callback);
+}
+
} // namespace doris
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 20be6c0fcd7..b9abb7c6208 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -40,9 +40,22 @@ void do_file_response(const std::string& dir_path,
HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group = nullptr,
bool is_acquire_md5 = false);
-void do_dir_response(const std::string& dir_path, HttpRequest* req);
+void do_dir_response(const std::string& dir_path, HttpRequest* req,
+ bool is_acquire_filesize = false);
std::string get_content_type(const std::string& file_name);
bool load_size_smaller_than_wal_limit(int64_t content_length);
+
+// Whether a backend supports batch download
+Status is_support_batch_download(const std::string& address);
+
+Status list_remote_files_v2(const std::string& address, const std::string&
token,
+ const std::string& remote_dir,
+ std::vector<std::pair<std::string, size_t>>*
file_info_list);
+
+Status download_files_v2(const std::string& address, const std::string& token,
+ const std::string& remote_dir, const std::string&
local_dir,
+ const std::vector<std::pair<std::string, size_t>>&
file_info_list);
+
} // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index bea1d3b1a91..fa8d9b8248e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -44,6 +44,7 @@
#include "gutil/strings/split.h"
#include "gutil/strings/strip.h"
#include "http/http_client.h"
+#include "http/utils.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
@@ -399,28 +400,62 @@ Status
EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
.error(st);
}
}};
- std::string remote_url_prefix;
+
+ std::string remote_dir;
{
std::stringstream ss;
if (snapshot_path->back() == '/') {
- ss << "http://" << get_host_port(src.host, src.http_port) <<
HTTP_REQUEST_PREFIX
- << HTTP_REQUEST_TOKEN_PARAM << token <<
HTTP_REQUEST_FILE_PARAM << *snapshot_path
- << _clone_req.tablet_id << "/" << _clone_req.schema_hash <<
"/";
+ ss << *snapshot_path << _clone_req.tablet_id << "/" <<
_clone_req.schema_hash
+ << "/";
} else {
- ss << "http://" << get_host_port(src.host, src.http_port) <<
HTTP_REQUEST_PREFIX
- << HTTP_REQUEST_TOKEN_PARAM << token <<
HTTP_REQUEST_FILE_PARAM << *snapshot_path
- << "/" << _clone_req.tablet_id << "/" <<
_clone_req.schema_hash << "/";
+ ss << *snapshot_path << "/" << _clone_req.tablet_id << "/" <<
_clone_req.schema_hash
+ << "/";
}
- remote_url_prefix = ss.str();
+ remote_dir = ss.str();
}
- status = _download_files(&data_dir, remote_url_prefix,
local_data_path);
- if (!status.ok()) [[unlikely]] {
- LOG_WARNING("failed to download snapshot from remote BE")
- .tag("url", mask_token(remote_url_prefix))
- .error(status);
- continue; // Try another BE
+ std::string address = get_host_port(src.host, src.http_port);
+ if (config::enable_batch_download &&
is_support_batch_download(address).ok()) {
+ // download files via batch api.
+ LOG_INFO("remote BE supports batch download, use batch file
download")
+ .tag("address", address)
+ .tag("remote_dir", remote_dir);
+ status = _batch_download_files(&data_dir, address, remote_dir,
local_data_path);
+ if (!status.ok()) [[unlikely]] {
+ LOG_WARNING("failed to download snapshot from remote BE in
batch")
+ .tag("address", address)
+ .tag("remote_dir", remote_dir)
+ .error(status);
+ continue; // Try another BE
+ }
+ } else {
+ if (config::enable_batch_download) {
+ LOG_INFO("remote BE does not support batch download, use
single file download")
+ .tag("address", address)
+ .tag("remote_dir", remote_dir);
+ } else {
+ LOG_INFO("batch download is disabled, use single file
download")
+ .tag("address", address)
+ .tag("remote_dir", remote_dir);
+ }
+
+ std::string remote_url_prefix;
+ {
+ std::stringstream ss;
+ ss << "http://" << address << HTTP_REQUEST_PREFIX <<
HTTP_REQUEST_TOKEN_PARAM
+ << token << HTTP_REQUEST_FILE_PARAM << remote_dir;
+ remote_url_prefix = ss.str();
+ }
+
+ status = _download_files(&data_dir, remote_url_prefix,
local_data_path);
+ if (!status.ok()) [[unlikely]] {
+ LOG_WARNING("failed to download snapshot from remote BE")
+ .tag("url", mask_token(remote_url_prefix))
+ .error(status);
+ continue; // Try another BE
+ }
}
+
// No need to try again with another BE
_pending_rs_guards =
DORIS_TRY(_engine.snapshot_mgr()->convert_rowset_ids(
local_data_path, _clone_req.tablet_id, _clone_req.replica_id,
_clone_req.table_id,
@@ -514,7 +549,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir,
const std::string& re
// If the header file is not exist, the table couldn't loaded by olap
engine.
// Avoid of data is not complete, we copy the header file at last.
// The header file's name is end of .hdr.
- for (int i = 0; i < file_name_list.size() - 1; ++i) {
+ for (int i = 0; i + 1 < file_name_list.size(); ++i) {
if (file_name_list[i].ends_with(".hdr")) {
std::swap(file_name_list[i], file_name_list[file_name_list.size()
- 1]);
break;
@@ -593,13 +628,91 @@ Status EngineCloneTask::_download_files(DataDir*
data_dir, const std::string& re
}
_copy_size = (int64_t)total_file_size;
_copy_time_ms = (int64_t)total_time_ms;
- LOG(INFO) << "succeed to copy tablet " << _signature << ", total file
size: " << total_file_size
- << " B"
- << ", cost: " << total_time_ms << " ms"
+ LOG(INFO) << "succeed to copy tablet " << _signature
+ << ", total files: " << file_name_list.size()
+ << ", total file size: " << total_file_size << " B, cost: " <<
total_time_ms << " ms"
<< ", rate: " << copy_rate << " MB/s";
return Status::OK();
}
+Status EngineCloneTask::_batch_download_files(DataDir* data_dir, const
std::string& address,
+ const std::string& remote_dir,
+ const std::string& local_dir) {
+ constexpr size_t BATCH_FILE_SIZE = 64 << 20; // 64MB
+ constexpr size_t BATCH_FILE_NUM = 64;
+
+ // Check local path exist, if exist, remove it, then create the dir
+ // local_file_full_path = tabletid/clone, for a specific tablet, there
should be only one folder
+ // if this folder exists, then should remove it
+ // for example, BE clone from BE 1 to download file 1 with version (2,2),
but clone from BE 1 failed
+ // then it will try to clone from BE 2, but it will find the file 1
already exist, but file 1 with same
+ // name may have different versions.
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(local_dir));
+
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(local_dir));
+
+ const std::string& token = _cluster_info->token;
+ std::vector<std::pair<std::string, size_t>> file_info_list;
+ RETURN_IF_ERROR(list_remote_files_v2(address, token, remote_dir,
&file_info_list));
+
+ // If the header file is not exist, the table couldn't loaded by olap
engine.
+ // Avoid of data is not complete, we copy the header file at last.
+ // The header file's name is end of .hdr.
+ for (int i = 0; i + 1 < file_info_list.size(); ++i) {
+ if (file_info_list[i].first.ends_with(".hdr")) {
+ std::swap(file_info_list[i], file_info_list[file_info_list.size()
- 1]);
+ break;
+ }
+ }
+
+ MonotonicStopWatch watch;
+ watch.start();
+
+ size_t total_file_size = 0;
+ size_t total_files = file_info_list.size();
+ std::vector<std::pair<std::string, size_t>> batch_files;
+ for (size_t i = 0; i < total_files;) {
+ size_t batch_file_size = 0;
+ for (size_t j = i; j < total_files; j++) {
+ // Split batchs by file number and file size,
+ if (BATCH_FILE_NUM <= batch_files.size() || BATCH_FILE_SIZE <=
batch_file_size ||
+ // ... or separate the last .hdr file into a single batch.
+ (j + 1 == total_files && !batch_files.empty())) {
+ break;
+ }
+ batch_files.push_back(file_info_list[j]);
+ batch_file_size += file_info_list[j].second;
+ }
+
+ // check disk capacity
+ if (data_dir->reach_capacity_limit(batch_file_size)) {
+ return Status::Error<EXCEEDED_LIMIT>(
+ "reach the capacity limit of path {}, file_size={}",
data_dir->path(),
+ batch_file_size);
+ }
+
+ RETURN_IF_ERROR(download_files_v2(address, token, remote_dir,
local_dir, batch_files));
+
+ total_file_size += batch_file_size;
+ i += batch_files.size();
+ batch_files.clear();
+ }
+
+ uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+ total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+ double copy_rate = 0.0;
+ if (total_time_ms > 0) {
+ copy_rate = total_file_size / ((double)total_time_ms) / 1000;
+ }
+ _copy_size = (int64_t)total_file_size;
+ _copy_time_ms = (int64_t)total_time_ms;
+ LOG(INFO) << "succeed to copy tablet " << _signature
+ << ", total files: " << file_info_list.size()
+ << ", total file size: " << total_file_size << " B, cost: " <<
total_time_ms << " ms"
+ << ", rate: " << copy_rate << " MB/s";
+
+ return Status::OK();
+}
+
/// This method will only be called if tablet already exist in this BE when
doing clone.
/// This method will do the following things:
/// 1. Link all files from CLONE dir to tablet dir if file does not exist in
tablet dir
diff --git a/be/src/olap/task/engine_clone_task.h
b/be/src/olap/task/engine_clone_task.h
index a11d4c742f4..e2ced28f03c 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -79,6 +79,9 @@ private:
Status _download_files(DataDir* data_dir, const std::string&
remote_url_prefix,
const std::string& local_path);
+ Status _batch_download_files(DataDir* data_dir, const std::string&
endpoint,
+ const std::string& remote_dir, const
std::string& local_dir);
+
Status _make_snapshot(const std::string& ip, int port, TTableId tablet_id,
TSchemaHash schema_hash, int timeout_s,
const std::vector<Version>& missing_versions,
std::string* snapshot_path,
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index a74f00291de..0ee484ec886 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "http/action/adjust_log_level.h"
#include "http/action/adjust_tracing_dump.h"
+#include "http/action/batch_download_action.h"
#include "http/action/be_proc_thread_action.h"
#include "http/action/calc_file_crc_action.h"
#include "http/action/check_rpc_channel_action.h"
@@ -290,6 +291,16 @@ void HttpService::register_local_handler(StorageEngine&
engine) {
tablet_download_action);
_ev_http_server->register_handler(HttpMethod::GET,
"/api/_tablet/_download",
tablet_download_action);
+
+ BatchDownloadAction* batch_download_action =
+ _pool.add(new BatchDownloadAction(_env, _rate_limit_group,
allow_paths));
+ _ev_http_server->register_handler(HttpMethod::HEAD,
"/api/_tablet/_batch_download",
+ batch_download_action);
+ _ev_http_server->register_handler(HttpMethod::GET,
"/api/_tablet/_batch_download",
+ batch_download_action);
+ _ev_http_server->register_handler(HttpMethod::POST,
"/api/_tablet/_batch_download",
+ batch_download_action);
+
if (config::enable_single_replica_load) {
DownloadAction* single_replica_download_action = _pool.add(new
DownloadAction(
_env, nullptr, allow_paths,
config::single_replica_load_download_num_workers));
diff --git a/be/test/http/http_client_test.cpp
b/be/test/http/http_client_test.cpp
index c98328d7c8e..84e4d259ff5 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -25,6 +25,7 @@
#include <unistd.h>
#include <boost/algorithm/string/predicate.hpp>
+#include <filesystem>
#include "gtest/gtest_pred_impl.h"
#include "http/ev_http_server.h"
@@ -102,14 +103,32 @@ public:
}
};
+class HttpBatchDownloadFileHandler : public HttpHandler {
+public:
+ void handle(HttpRequest* req) override {
+ if (req->param("check") == "true") {
+ HttpChannel::send_reply(req, "OK");
+ } else if (req->param("list") == "true") {
+ do_dir_response(req->param("dir"), req, true);
+ } else {
+ std::vector<std::string> acquire_files =
+ strings::Split(req->get_request_body(), "\n",
strings::SkipWhitespace());
+ HttpChannel::send_files(req, req->param("dir"), acquire_files);
+ }
+ }
+};
+
static EvHttpServer* s_server = nullptr;
static int real_port = 0;
static std::string hostname = "";
+static std::string address = "";
+constexpr std::string_view TMP_DIR = "./http_test_tmp";
static HttpClientTestSimpleGetHandler s_simple_get_handler;
static HttpClientTestSimplePostHandler s_simple_post_handler;
static HttpNotFoundHandler s_not_found_handler;
static HttpDownloadFileHandler s_download_file_handler;
+static HttpBatchDownloadFileHandler s_batch_download_file_handler;
class HttpClientTest : public testing::Test {
public:
@@ -123,10 +142,17 @@ public:
s_server->register_handler(POST, "/simple_post",
&s_simple_post_handler);
s_server->register_handler(GET, "/not_found", &s_not_found_handler);
s_server->register_handler(HEAD, "/download_file",
&s_download_file_handler);
+ s_server->register_handler(HEAD, "/api/_tablet/_batch_download",
+ &s_batch_download_file_handler);
+ s_server->register_handler(GET, "/api/_tablet/_batch_download",
+ &s_batch_download_file_handler);
+ s_server->register_handler(POST, "/api/_tablet/_batch_download",
+ &s_batch_download_file_handler);
static_cast<void>(s_server->start());
real_port = s_server->get_real_port();
EXPECT_NE(0, real_port);
- hostname = "http://127.0.0.1:" + std::to_string(real_port);
+ address = "127.0.0.1:" + std::to_string(real_port);
+ hostname = "http://" + address;
}
static void TearDownTestCase() { delete s_server; }
@@ -571,4 +597,74 @@ TEST_F(HttpClientTest, enable_http_auth) {
}
}
+TEST_F(HttpClientTest, batch_download) {
+ EXPECT_TRUE(io::global_local_filesystem()->delete_directory(TMP_DIR).ok());
+ EXPECT_TRUE(io::global_local_filesystem()->create_directory(TMP_DIR).ok());
+
+ std::string root_dir(TMP_DIR);
+ std::string remote_related_dir = root_dir + "/source";
+ std::string local_dir = root_dir + "/target";
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(remote_related_dir).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_dir).ok());
+
+ std::string remote_dir;
+
EXPECT_TRUE(io::global_local_filesystem()->canonicalize(remote_related_dir,
&remote_dir).ok());
+
+ // 0. create dir source and prepare a large file exceeds 1MB
+ {
+ std::string large_file = remote_dir + "/a_large_file";
+ int fd = open(large_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ ASSERT_TRUE(fd >= 0);
+ std::string buf = "0123456789";
+ for (int i = 0; i < 10; i++) {
+ buf += buf;
+ }
+ for (int i = 0; i < 1024; i++) {
+ ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0);
+ }
+ close(fd);
+
+ // create some small files.
+ for (int i = 0; i < 32; i++) {
+ std::string small_file = remote_dir + "/small_file_" +
std::to_string(i);
+ fd = open(small_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ ASSERT_TRUE(fd >= 0);
+ ASSERT_TRUE(write(fd, buf.c_str(), buf.size()) > 0);
+ close(fd);
+ }
+
+ // create a empty file
+ std::string empty_file = remote_dir + "/empty_file";
+ fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ ASSERT_TRUE(fd >= 0);
+ close(fd);
+
+ empty_file = remote_dir + "/zzzz";
+ fd = open(empty_file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ ASSERT_TRUE(fd >= 0);
+ close(fd);
+ }
+
+ // 1. check remote support batch download
+ Status st = is_support_batch_download(address);
+ EXPECT_TRUE(st.ok());
+
+ // 2. list remote files
+ std::vector<std::pair<std::string, size_t>> file_info_list;
+ st = list_remote_files_v2(address, "token", remote_dir, &file_info_list);
+ EXPECT_TRUE(st.ok());
+
+ // 3. download files
+ if (file_info_list.size() > 64) {
+ file_info_list.resize(64);
+ }
+
+ // sort file info list by file name
+ std::sort(file_info_list.begin(), file_info_list.end(),
+ [](const auto& a, const auto& b) { return a.first < b.first; });
+
+ st = download_files_v2(address, "token", remote_dir, local_dir,
file_info_list);
+ EXPECT_TRUE(st.ok());
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]