This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aaec0d89ef3 [Chore](cloud) support save error log file to s3 (#36341)
aaec0d89ef3 is described below

commit aaec0d89ef3fe805cf7722f6e36859c5834ce495
Author: Xin Liao <[email protected]>
AuthorDate: Fri Jun 21 21:01:40 2024 +0800

    [Chore](cloud) support save error log file to s3 (#36341)
    
    ## Proposed changes
    
    Users may not be able to directly access BE's IP in cloud mode,
    supporting save the error log to S3 for convenient user access.
    
    <!--Describe your changes.-->
---
 be/src/cloud/config.cpp                            |  2 ++
 be/src/cloud/config.h                              |  2 ++
 be/src/io/fs/azure_obj_storage_client.h            |  5 +++
 be/src/io/fs/obj_storage_client.h                  |  3 ++
 be/src/io/fs/s3_file_system.cpp                    | 23 ++++++++++++
 be/src/io/fs/s3_file_system.h                      |  5 +++
 be/src/io/fs/s3_obj_storage_client.cpp             |  6 ++++
 be/src/io/fs/s3_obj_storage_client.h               |  2 ++
 be/src/runtime/fragment_mgr.cpp                    |  4 +++
 be/src/runtime/runtime_state.cpp                   | 42 ++++++++++++++++++++++
 be/src/runtime/runtime_state.h                     |  9 ++++-
 .../stream_load/test_stream_load_error_url.groovy  |  3 ++
 12 files changed, 105 insertions(+), 1 deletion(-)

diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 57373e10859..80522759b84 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -55,6 +55,8 @@ DEFINE_mInt32(schedule_sync_tablets_interval_s, "600");
 
 DEFINE_mInt32(mow_stream_load_commit_retry_times, "10");
 
+DEFINE_mBool(save_load_error_log_to_s3, "false");
+
 DEFINE_mInt32(sync_load_for_tablets_thread, "32");
 
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index de53095e1b3..bf041ba0fa6 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -88,6 +88,8 @@ DECLARE_mInt32(schedule_sync_tablets_interval_s);
 // Cloud mow
 DECLARE_mInt32(mow_stream_load_commit_retry_times);
 
+DECLARE_mBool(save_load_error_log_to_s3);
+
 // the theads which sync the datas which loaded in other clusters
 DECLARE_mInt32(sync_load_for_tablets_thread);
 
diff --git a/be/src/io/fs/azure_obj_storage_client.h 
b/be/src/io/fs/azure_obj_storage_client.h
index a8c2db9d4db..0dbe56b0b69 100644
--- a/be/src/io/fs/azure_obj_storage_client.h
+++ b/be/src/io/fs/azure_obj_storage_client.h
@@ -55,6 +55,11 @@ public:
                                          std::vector<std::string> objs) 
override;
     ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) 
override;
     ObjectStorageResponse delete_objects_recursively(const 
ObjectStoragePathOptions& opts) override;
+    // TODO(ByteYue) : to be implemented
+    std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
+                                       int64_t expiration_secs) override {
+        return "http://azure.to.be.implenmented";;
+    };
 
 private:
     std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
diff --git a/be/src/io/fs/obj_storage_client.h 
b/be/src/io/fs/obj_storage_client.h
index 2a99bde80f1..cd3db3c7371 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -113,6 +113,9 @@ public:
     // According to the prefix, recursively delete all files under the prefix.
     virtual ObjectStorageResponse delete_objects_recursively(
             const ObjectStoragePathOptions& opts) = 0;
+    // Return a presigned URL for users to access the object
+    virtual std::string generate_presigned_url(const ObjectStoragePathOptions& 
opts,
+                                               int64_t expiration_secs) = 0;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index ffc656bfc0f..0a6048592b7 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -25,6 +25,7 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 // IWYU pragma: no_include <bits/chrono.h>
 #include <aws/core/utils/threading/Executor.h>
+#include <aws/s3/S3Client.h>
 
 #include <chrono> // IWYU pragma: keep
 #include <filesystem>
@@ -51,6 +52,9 @@
 
 namespace doris::io {
 namespace {
+constexpr std::string_view OSS_PRIVATE_ENDPOINT_SUFFIX = 
"-internal.aliyuncs.com";
+constexpr int LEN_OF_OSS_PRIVATE_SUFFIX = 9; // length of "-internal"
+
 #ifndef CHECK_S3_CLIENT
 #define CHECK_S3_CLIENT(client)                                 \
     if (!client) {                                              \
@@ -421,4 +425,23 @@ Status S3FileSystem::download_impl(const Path& 
remote_file, const Path& local_fi
     return Status::OK();
 }
 
+// oss has public endpoint and private endpoint, is_public_endpoint determines
+// whether to return a public endpoint.
+std::string S3FileSystem::generate_presigned_url(const Path& path, int64_t 
expiration_secs,
+                                                 bool is_public_endpoint) 
const {
+    std::string key = fmt::format("{}/{}", _prefix, path.native());
+    std::shared_ptr<ObjStorageClient> client;
+    if (is_public_endpoint &&
+        
_client->s3_client_conf().endpoint.ends_with(OSS_PRIVATE_ENDPOINT_SUFFIX)) {
+        auto new_s3_conf = _client->s3_client_conf();
+        new_s3_conf.endpoint.erase(
+                _client->s3_client_conf().endpoint.size() - 
OSS_PRIVATE_ENDPOINT_SUFFIX.size(),
+                LEN_OF_OSS_PRIVATE_SUFFIX);
+        client = S3ClientFactory::instance().create(new_s3_conf);
+    } else {
+        client = _client->get();
+    }
+    return client->generate_presigned_url({.bucket = _bucket, .key = key}, 
expiration_secs);
+}
+
 } // namespace doris::io
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 96eafb5d18d..d1e8b5b6e31 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -62,6 +62,8 @@ public:
     // For error msg
     std::string full_s3_path(std::string_view bucket, std::string_view key) 
const;
 
+    const S3ClientConf& s3_client_conf() { return _conf; }
+
 private:
     mutable std::shared_mutex _mtx;
     std::shared_ptr<ObjStorageClient> _client;
@@ -88,6 +90,9 @@ public:
     const std::string& bucket() const { return _bucket; }
     const std::string& prefix() const { return _prefix; }
 
+    std::string generate_presigned_url(const Path& path, int64_t 
expiration_secs,
+                                       bool is_public_endpoint) const;
+
 protected:
     Status create_file_impl(const Path& file, FileWriterPtr* writer,
                             const FileWriterOptions* opts) override;
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp 
b/be/src/io/fs/s3_obj_storage_client.cpp
index f2ce933cf34..ffe652583c8 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -387,4 +387,10 @@ ObjectStorageResponse 
S3ObjStorageClient::delete_objects_recursively(
     return {};
 }
 
+std::string S3ObjStorageClient::generate_presigned_url(const 
ObjectStoragePathOptions& opts,
+                                                       int64_t 
expiration_secs) {
+    return _client->GeneratePresignedUrl(opts.bucket, opts.key, 
Aws::Http::HttpMethod::HTTP_GET,
+                                         expiration_secs);
+}
+
 } // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h 
b/be/src/io/fs/s3_obj_storage_client.h
index ea2b00e58df..0bc2d5ef5af 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -58,6 +58,8 @@ public:
                                          std::vector<std::string> objs) 
override;
     ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) 
override;
     ObjectStorageResponse delete_objects_recursively(const 
ObjectStoragePathOptions& opts) override;
+    std::string generate_presigned_url(const ObjectStoragePathOptions& opts,
+                                       int64_t expiration_secs) override;
 
 private:
     std::shared_ptr<Aws::S3::S3Client> _client;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3fe4526de73..da25527b68c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -51,6 +51,7 @@
 #include <unordered_map>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/object_pool.h"
@@ -118,6 +119,9 @@ std::string to_load_error_http_path(const std::string& 
file_name) {
     if (file_name.empty()) {
         return "";
     }
+    if (file_name.compare(0, 4, "http") == 0) {
+        return file_name;
+    }
     std::stringstream url;
     url << "http://"; << get_host_port(BackendOptions::get_localhost(), 
config::webserver_port)
         << "/api/_load_error_log?"
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 137b5a84642..48d71a64eb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -28,10 +28,14 @@
 #include <memory>
 #include <string>
 
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/status.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/storage_engine.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
@@ -253,6 +257,20 @@ RuntimeState::~RuntimeState() {
         _error_log_file->close();
         delete _error_log_file;
         _error_log_file = nullptr;
+        if (_s3_error_fs) {
+            std::string error_log_absolute_path =
+                    
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
+            // upload error log file to s3
+            Status st = _s3_error_fs->upload(error_log_absolute_path, 
_s3_error_log_file_path);
+            if (st.ok()) {
+                // remove local error log file
+                std::filesystem::remove(error_log_absolute_path);
+            } else {
+                // remove local error log file later by 
clean_expired_temp_path thread
+                LOG(WARNING) << "Fail to upload error file to s3, 
error_log_file_path="
+                             << _error_log_file_path << ", error=" << st;
+            }
+        }
     }
 
     _obj_pool->clear();
@@ -359,6 +377,19 @@ Status RuntimeState::cancel_reason() const {
 const int64_t MAX_ERROR_NUM = 50;
 
 Status RuntimeState::create_error_log_file() {
+    if (config::save_load_error_log_to_s3 && config::is_cloud_mode()) {
+        _s3_error_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+                
ExecEnv::GetInstance()->storage_engine().to_cloud().latest_fs());
+        if (_s3_error_fs) {
+            std::stringstream ss;
+            // 
https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_err_packet.html
+            // shorten the path as much as possible to prevent the length of 
the presigned URL from
+            // exceeding the MySQL error packet size limit
+            ss << "error_log/" << _import_label << "_" << std::hex << 
_fragment_instance_id.hi;
+            _s3_error_log_file_path = ss.str();
+        }
+    }
+
     static_cast<void>(_exec_env->load_path_mgr()->get_load_error_file_name(
             _db_name, _import_label, _fragment_instance_id, 
&_error_log_file_path));
     std::string error_log_absolute_path =
@@ -433,6 +464,17 @@ Status 
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
     return Status::OK();
 }
 
+std::string RuntimeState::get_error_log_file_path() const {
+    if (_s3_error_fs) {
+        // expiration must be less than a week (in seconds) for presigned url
+        static const unsigned EXPIRATION_SECONDS = 7 * 24 * 60 * 60 - 1;
+        // We should return a public endpoint to user.
+        return _s3_error_fs->generate_presigned_url(_s3_error_log_file_path, 
EXPIRATION_SECONDS,
+                                                    true);
+    }
+    return _error_log_file_path;
+}
+
 int64_t RuntimeState::get_load_mem_limit() {
     // TODO: the code is abandoned, it can be deleted after v1.3
     if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit 
> 0) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index dbd6e7cca5e..e774b3a2890 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -41,6 +41,8 @@
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/integral_types.h"
+#include "io/fs/file_system.h"
+#include "io/fs/s3_file_system.h"
 #include "runtime/task_execution_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
@@ -261,7 +263,7 @@ public:
 
     int64_t load_job_id() const { return _load_job_id; }
 
-    const std::string get_error_log_file_path() const { return 
_error_log_file_path; }
+    std::string get_error_log_file_path() const;
 
     // append error msg and error line to file when loading data.
     // is_summary is true, means we are going to write the summary line
@@ -738,6 +740,11 @@ private:
     RuntimeState(const RuntimeState&);
 
     vectorized::ColumnInt64* _partial_update_auto_inc_column;
+
+    // save error log to s3
+    std::shared_ptr<io::S3FileSystem> _s3_error_fs;
+    // error file path on s3, 
${bucket}/${prefix}/error_log/${label}_${fragment_instance_id}
+    std::string _s3_error_log_file_path;
 };
 
 #define RETURN_IF_CANCELLED(state)                                             
       \
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
index 72fc212e241..e1e5d82494b 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
@@ -68,6 +68,9 @@ suite("test_stream_load_error_url", "p0") {
                 log.info("error result: " + out)
                 assertTrue(out.contains("actual column number in csv file is  
more than  schema column number.actual number"))
                 log.info("url: " + json.ErrorURL)
+                if (isCloudMode()) {
+                    assertTrue(json.ErrorURL.contains("X-Amz-Signature="))
+                }
             }
         }
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to