This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aaec0d89ef3 [Chore](cloud) support save error log file to s3 (#36341)
aaec0d89ef3 is described below
commit aaec0d89ef3fe805cf7722f6e36859c5834ce495
Author: Xin Liao <[email protected]>
AuthorDate: Fri Jun 21 21:01:40 2024 +0800
[Chore](cloud) support save error log file to s3 (#36341)
## Proposed changes
Users may not be able to directly access BE's IP in cloud mode,
supporting save the error log to S3 for convenient user access.
<!--Describe your changes.-->
---
be/src/cloud/config.cpp | 2 ++
be/src/cloud/config.h | 2 ++
be/src/io/fs/azure_obj_storage_client.h | 5 +++
be/src/io/fs/obj_storage_client.h | 3 ++
be/src/io/fs/s3_file_system.cpp | 23 ++++++++++++
be/src/io/fs/s3_file_system.h | 5 +++
be/src/io/fs/s3_obj_storage_client.cpp | 6 ++++
be/src/io/fs/s3_obj_storage_client.h | 2 ++
be/src/runtime/fragment_mgr.cpp | 4 +++
be/src/runtime/runtime_state.cpp | 42 ++++++++++++++++++++++
be/src/runtime/runtime_state.h | 9 ++++-
.../stream_load/test_stream_load_error_url.groovy | 3 ++
12 files changed, 105 insertions(+), 1 deletion(-)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 57373e10859..80522759b84 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -55,6 +55,8 @@ DEFINE_mInt32(schedule_sync_tablets_interval_s, "600");
DEFINE_mInt32(mow_stream_load_commit_retry_times, "10");
+DEFINE_mBool(save_load_error_log_to_s3, "false");
+
DEFINE_mInt32(sync_load_for_tablets_thread, "32");
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index de53095e1b3..bf041ba0fa6 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -88,6 +88,8 @@ DECLARE_mInt32(schedule_sync_tablets_interval_s);
// Cloud mow
DECLARE_mInt32(mow_stream_load_commit_retry_times);
+DECLARE_mBool(save_load_error_log_to_s3);
+
// the theads which sync the datas which loaded in other clusters
DECLARE_mInt32(sync_load_for_tablets_thread);
diff --git a/be/src/io/fs/azure_obj_storage_client.h
b/be/src/io/fs/azure_obj_storage_client.h
index a8c2db9d4db..0dbe56b0b69 100644
--- a/be/src/io/fs/azure_obj_storage_client.h
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -55,6 +55,11 @@ public:
std::vector<std::string> objs)
override;
ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts)
override;
ObjectStorageResponse delete_objects_recursively(const
ObjectStoragePathOptions& opts) override;
+ // TODO(ByteYue) : to be implemented
+ std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
+ int64_t expiration_secs) override {
+ return "http://azure.to.be.implenmented";
+ };
private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
diff --git a/be/src/io/fs/obj_storage_client.h
b/be/src/io/fs/obj_storage_client.h
index 2a99bde80f1..cd3db3c7371 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -113,6 +113,9 @@ public:
// According to the prefix, recursively delete all files under the prefix.
virtual ObjectStorageResponse delete_objects_recursively(
const ObjectStoragePathOptions& opts) = 0;
+ // Return a presigned URL for users to access the object
+ virtual std::string generate_presigned_url(const ObjectStoragePathOptions&
opts,
+ int64_t expiration_secs) = 0;
};
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index ffc656bfc0f..0a6048592b7 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -25,6 +25,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
// IWYU pragma: no_include <bits/chrono.h>
#include <aws/core/utils/threading/Executor.h>
+#include <aws/s3/S3Client.h>
#include <chrono> // IWYU pragma: keep
#include <filesystem>
@@ -51,6 +52,9 @@
namespace doris::io {
namespace {
+constexpr std::string_view OSS_PRIVATE_ENDPOINT_SUFFIX =
"-internal.aliyuncs.com";
+constexpr int LEN_OF_OSS_PRIVATE_SUFFIX = 9; // length of "-internal"
+
#ifndef CHECK_S3_CLIENT
#define CHECK_S3_CLIENT(client) \
if (!client) { \
@@ -421,4 +425,23 @@ Status S3FileSystem::download_impl(const Path&
remote_file, const Path& local_fi
return Status::OK();
}
+// oss has public endpoint and private endpoint, is_public_endpoint determines
+// whether to return a public endpoint.
+std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t
expiration_secs,
+ bool is_public_endpoint)
const {
+ std::string key = fmt::format("{}/{}", _prefix, path.native());
+ std::shared_ptr<ObjStorageClient> client;
+ if (is_public_endpoint &&
+
_client->s3_client_conf().endpoint.ends_with(OSS_PRIVATE_ENDPOINT_SUFFIX)) {
+ auto new_s3_conf = _client->s3_client_conf();
+ new_s3_conf.endpoint.erase(
+ _client->s3_client_conf().endpoint.size() -
OSS_PRIVATE_ENDPOINT_SUFFIX.size(),
+ LEN_OF_OSS_PRIVATE_SUFFIX);
+ client = S3ClientFactory::instance().create(new_s3_conf);
+ } else {
+ client = _client->get();
+ }
+ return client->generate_presigned_url({.bucket = _bucket, .key = key},
expiration_secs);
+}
+
} // namespace doris::io
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 96eafb5d18d..d1e8b5b6e31 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -62,6 +62,8 @@ public:
// For error msg
std::string full_s3_path(std::string_view bucket, std::string_view key)
const;
+ const S3ClientConf& s3_client_conf() { return _conf; }
+
private:
mutable std::shared_mutex _mtx;
std::shared_ptr<ObjStorageClient> _client;
@@ -88,6 +90,9 @@ public:
const std::string& bucket() const { return _bucket; }
const std::string& prefix() const { return _prefix; }
+ std::string generate_presigned_url(const Path& path, int64_t
expiration_secs,
+ bool is_public_endpoint) const;
+
protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) override;
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp
b/be/src/io/fs/s3_obj_storage_client.cpp
index f2ce933cf34..ffe652583c8 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -387,4 +387,10 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects_recursively(
return {};
}
+std::string S3ObjStorageClient::generate_presigned_url(const
ObjectStoragePathOptions& opts,
+ int64_t
expiration_secs) {
+ return _client->GeneratePresignedUrl(opts.bucket, opts.key,
Aws::Http::HttpMethod::HTTP_GET,
+ expiration_secs);
+}
+
} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h
b/be/src/io/fs/s3_obj_storage_client.h
index ea2b00e58df..0bc2d5ef5af 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -58,6 +58,8 @@ public:
std::vector<std::string> objs)
override;
ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts)
override;
ObjectStorageResponse delete_objects_recursively(const
ObjectStoragePathOptions& opts) override;
+ std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
+ int64_t expiration_secs) override;
private:
std::shared_ptr<Aws::S3::S3Client> _client;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3fe4526de73..da25527b68c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -51,6 +51,7 @@
#include <unordered_map>
#include <utility>
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
@@ -118,6 +119,9 @@ std::string to_load_error_http_path(const std::string&
file_name) {
if (file_name.empty()) {
return "";
}
+ if (file_name.compare(0, 4, "http") == 0) {
+ return file_name;
+ }
std::stringstream url;
url << "http://" << get_host_port(BackendOptions::get_localhost(),
config::webserver_port)
<< "/api/_load_error_log?"
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 137b5a84642..48d71a64eb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -28,10 +28,14 @@
#include <memory>
#include <string>
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/storage_engine.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
@@ -253,6 +257,20 @@ RuntimeState::~RuntimeState() {
_error_log_file->close();
delete _error_log_file;
_error_log_file = nullptr;
+ if (_s3_error_fs) {
+ std::string error_log_absolute_path =
+
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
+ // upload error log file to s3
+ Status st = _s3_error_fs->upload(error_log_absolute_path,
_s3_error_log_file_path);
+ if (st.ok()) {
+ // remove local error log file
+ std::filesystem::remove(error_log_absolute_path);
+ } else {
+ // remove local error log file later by
clean_expired_temp_path thread
+ LOG(WARNING) << "Fail to upload error file to s3,
error_log_file_path="
+ << _error_log_file_path << ", error=" << st;
+ }
+ }
}
_obj_pool->clear();
@@ -359,6 +377,19 @@ Status RuntimeState::cancel_reason() const {
const int64_t MAX_ERROR_NUM = 50;
Status RuntimeState::create_error_log_file() {
+ if (config::save_load_error_log_to_s3 && config::is_cloud_mode()) {
+ _s3_error_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+
ExecEnv::GetInstance()->storage_engine().to_cloud().latest_fs());
+ if (_s3_error_fs) {
+ std::stringstream ss;
+ //
https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_err_packet.html
+ // shorten the path as much as possible to prevent the length of
the presigned URL from
+ // exceeding the MySQL error packet size limit
+ ss << "error_log/" << _import_label << "_" << std::hex <<
_fragment_instance_id.hi;
+ _s3_error_log_file_path = ss.str();
+ }
+ }
+
static_cast<void>(_exec_env->load_path_mgr()->get_load_error_file_name(
_db_name, _import_label, _fragment_instance_id,
&_error_log_file_path));
std::string error_log_absolute_path =
@@ -433,6 +464,17 @@ Status
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
return Status::OK();
}
+std::string RuntimeState::get_error_log_file_path() const {
+ if (_s3_error_fs) {
+ // expiration must be less than a week (in seconds) for presigned url
+ static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1;
+ // We should return a public endpoint to user.
+ return _s3_error_fs->generate_presigned_url(_s3_error_log_file_path,
EXPIRATION_SECONDS,
+ true);
+ }
+ return _error_log_file_path;
+}
+
int64_t RuntimeState::get_load_mem_limit() {
// TODO: the code is abandoned, it can be deleted after v1.3
if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit
> 0) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index dbd6e7cca5e..e774b3a2890 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -41,6 +41,8 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/integral_types.h"
+#include "io/fs/file_system.h"
+#include "io/fs/s3_file_system.h"
#include "runtime/task_execution_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
@@ -261,7 +263,7 @@ public:
int64_t load_job_id() const { return _load_job_id; }
- const std::string get_error_log_file_path() const { return
_error_log_file_path; }
+ std::string get_error_log_file_path() const;
// append error msg and error line to file when loading data.
// is_summary is true, means we are going to write the summary line
@@ -738,6 +740,11 @@ private:
RuntimeState(const RuntimeState&);
vectorized::ColumnInt64* _partial_update_auto_inc_column;
+
+ // save error log to s3
+ std::shared_ptr<io::S3FileSystem> _s3_error_fs;
+ // error file path on s3,
${bucket}/${prefix}/error_log/${label}_${fragment_instance_id}
+ std::string _s3_error_log_file_path;
};
#define RETURN_IF_CANCELLED(state)
\
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
index 72fc212e241..e1e5d82494b 100644
---
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
@@ -68,6 +68,9 @@ suite("test_stream_load_error_url", "p0") {
log.info("error result: " + out)
assertTrue(out.contains("actual column number in csv file is
more than schema column number.actual number"))
log.info("url: " + json.ErrorURL)
+ if (isCloudMode()) {
+ assertTrue(json.ErrorURL.contains("X-Amz-Signature="))
+ }
}
}
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]